Compare commits
5 Commits
0e55223973
...
a238cc5fcc
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a238cc5fcc | ||
![]() |
c614e68280 | ||
![]() |
1bf5868fce | ||
![]() |
6020ca43ba | ||
![]() |
6130cfb724 |
|
@ -6,3 +6,11 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
ccutils = { git = "https://git.zhgsun.com:8089/jiachao2130/ccutils.git", version = "0.1.0" }
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
lazy_static = { version = "1.5" }
|
||||
reqwest = { version = "0.12", features = ["stream"] }
|
||||
scraper = { version = "0.19" }
|
||||
serde = { version = "1", features = ["serde_derive"] }
|
||||
tokio = { version = "1.38" }
|
||||
toml = { version = "0.8" }
|
||||
|
|
31
src/cli.rs
Normal file
31
src/cli.rs
Normal file
|
@ -0,0 +1,31 @@
|
|||
use clap::Parser;
|
||||
|
||||
/// cuweb-syncer 是一个用于从远程 web 服务器同步文件至本地的命令行工具。
|
||||
#[derive(Clone, Debug, Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
pub(crate) struct Cli {
|
||||
/// 在终端输出当前的配置文件内容
|
||||
#[arg(long, default_value_t = false)]
|
||||
pub config: bool,
|
||||
|
||||
/// 是否开启 debug 模式,将打印更详尽的日志信息
|
||||
#[arg(long, default_value_t = false)]
|
||||
pub debug: bool,
|
||||
|
||||
/// 静默模式,如任务运行正常则无任何信息输出
|
||||
#[arg(long, default_value_t = false)]
|
||||
pub quiet: bool,
|
||||
|
||||
/// 可选项,需配合 `dest` 参数使用,只运行一次指定的同步任务,指定服务器地址
|
||||
#[arg(short, long)]
|
||||
pub from: Option<String>,
|
||||
|
||||
/// 可选项,需配合 `from` 参数使用,只运行一次指定的同步任务,指定同步的本地路径
|
||||
#[arg(short, long)]
|
||||
pub dest: Option<String>,
|
||||
}
|
||||
|
||||
/// 从命令行环境变量读取并转换为 `Cli`
|
||||
pub(crate) fn parse() -> Cli {
|
||||
Cli::parse()
|
||||
}
|
141
src/config.rs
Normal file
141
src/config.rs
Normal file
|
@ -0,0 +1,141 @@
|
|||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Config 结构体包含一个 HashMap,其中键是 String 类型,值是 WebSycnConf 类型。
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
|
||||
pub(crate) struct Config {
|
||||
pub inner: HashMap<String, WebSycnConf>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl Config {
|
||||
/// 创建一个新的 Config 实例。
|
||||
///
|
||||
/// # 返回值
|
||||
/// 返回一个包含空 HashMap 的 Config 实例。
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// 从指定路径加载配置文件。
|
||||
///
|
||||
/// 该方法尝试从给定路径读取文件,并解析 TOML 格式的数据。
|
||||
///
|
||||
/// # 参数
|
||||
/// - `path`: 配置文件的路径。
|
||||
///
|
||||
/// # 返回值
|
||||
/// 返回一个 `Result`,成功时包含 `Config` 实例,失败时包含错误信息。
|
||||
pub fn load(path: &str) -> crate::Result<Self> {
|
||||
// 读取文件内容
|
||||
let data = std::fs::read_to_string(path)?;
|
||||
|
||||
// 解析 TOML 数据
|
||||
match toml::from_str::<Config>(&data) {
|
||||
Ok(config) => Ok(config),
|
||||
Err(e) => Err(Box::new(e)),
|
||||
}
|
||||
}
|
||||
|
||||
/// 将当前配置保存到指定路径。
|
||||
///
|
||||
/// 该方法将当前配置序列化为 TOML 格式,并写入到指定路径的文件中。
|
||||
///
|
||||
/// # 参数
|
||||
/// - `path`: 配置文件的保存路径。
|
||||
///
|
||||
/// # 返回值
|
||||
/// 返回一个 `Result`,成功时返回 `Ok(())`,失败时返回错误信息。
|
||||
pub fn save(&self, path: &str) -> crate::Result<()> {
|
||||
// 将配置序列化为 TOML 格式的字符串
|
||||
let data = toml::to_string_pretty(&self)?;
|
||||
|
||||
// 打开文件并写入数据
|
||||
let mut config = fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(path)?;
|
||||
config.write_all(data.as_bytes())?;
|
||||
config.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// WebSycnConf 结构体表示一个同步配置,包括源路径和目标路径。
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
|
||||
pub(crate) struct WebSycnConf {
|
||||
/// 源路径
|
||||
from: String,
|
||||
/// 目标路径
|
||||
dest: String,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl WebSycnConf {
|
||||
/// 创建一个新的 WebSycnConf 实例。
|
||||
///
|
||||
/// # 参数
|
||||
/// - `from`: 源路径
|
||||
/// - `dest`: 目标路径
|
||||
///
|
||||
/// # 返回值
|
||||
/// 返回一个新的 WebSycnConf 实例。
|
||||
pub fn new(from: String, dest: String) -> Self {
|
||||
WebSycnConf { from, dest }
|
||||
}
|
||||
|
||||
/// 获取源路径。
|
||||
///
|
||||
/// # 返回值
|
||||
/// 返回源路径的引用。
|
||||
pub fn from(&self) -> &str {
|
||||
&self.from
|
||||
}
|
||||
|
||||
/// 获取目标路径。
|
||||
///
|
||||
/// # 返回值
|
||||
/// 返回目标路径的引用。
|
||||
pub fn dest(&self) -> &str {
|
||||
&self.dest
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_test() {
|
||||
let cvrfsyncer = WebSycnConf::new(
|
||||
"http://mirrors.ustc.edu.cn/openeuler/security/data/cvrf/".to_string(),
|
||||
"./test/cvrfs".to_string(),
|
||||
);
|
||||
|
||||
let srpmsyncer = WebSycnConf::new(
|
||||
"http://mirrors.ustc.edu.cn/culinux/3.0/source/".to_string(),
|
||||
"./test/srpms".to_string(),
|
||||
);
|
||||
|
||||
let mut config = Config::new();
|
||||
config.inner.insert("cvrf".to_string(), cvrfsyncer);
|
||||
config.inner.insert("srpms".to_string(), srpmsyncer);
|
||||
|
||||
let conf_file = "./test/cuwebsyncer.toml";
|
||||
// 首先测试写入配置文件
|
||||
match config.save(conf_file) {
|
||||
Ok(()) => assert!(true),
|
||||
Err(e) => assert_eq!("", format!("{e}")),
|
||||
}
|
||||
|
||||
// 再测试读取
|
||||
let new_config = Config::load(conf_file).unwrap();
|
||||
|
||||
// 两个结果相等
|
||||
assert_eq!(config, new_config);
|
||||
|
||||
// 删除测试用文件
|
||||
std::fs::remove_file(conf_file).unwrap();
|
||||
}
|
52
src/lib.rs
Normal file
52
src/lib.rs
Normal file
|
@ -0,0 +1,52 @@
|
|||
use std::path::Path;
|
||||
|
||||
pub use ccutils::Result;
|
||||
|
||||
mod cli;
|
||||
mod config;
|
||||
use config::{Config, WebSycnConf};
|
||||
mod syncer;
|
||||
|
||||
pub const MAX_TASKS: usize = 10;
|
||||
pub const RETRIES: usize = 5;
|
||||
pub const CONF: &str = "/etc/cuvars/cuweb-syncer";
|
||||
|
||||
pub fn cumain() -> Result<()> {
|
||||
let cli = cli::parse();
|
||||
|
||||
// 设置日志级别
|
||||
match (cli.debug, cli.quiet) {
|
||||
(true, _) => std::env::set_var("RUST_LOG", "debug"),
|
||||
(false, true) => std::env::set_var("RUST_LOG", "error"),
|
||||
_ => {}
|
||||
}
|
||||
ccutils::set_up_logging()?;
|
||||
|
||||
let config = {
|
||||
if cli.from.is_some() && cli.dest.is_some() {
|
||||
let mut config = Config::new();
|
||||
let websync_conf =
|
||||
WebSycnConf::new(cli.from.clone().unwrap(), cli.dest.clone().unwrap());
|
||||
config.inner.insert("Task".to_string(), websync_conf);
|
||||
config
|
||||
} else {
|
||||
let default = Path::new(CONF);
|
||||
let conf = if default.is_file() {
|
||||
CONF
|
||||
} else {
|
||||
"cuweb-syncer"
|
||||
};
|
||||
match Config::load(conf) {
|
||||
Ok(conf) => conf,
|
||||
Err(e) => {
|
||||
panic!("Failed to load {conf}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let rt = ccutils::async_runtime()?;
|
||||
|
||||
let _ = rt.block_on(syncer::run(&config))?;
|
||||
Ok(())
|
||||
}
|
|
@ -1,3 +1,3 @@
|
|||
fn main() {
|
||||
println!("Hello, world!");
|
||||
fn main() -> cuweb_syncer::Result<()> {
|
||||
cuweb_syncer::cumain()
|
||||
}
|
||||
|
|
249
src/syncer.rs
Normal file
249
src/syncer.rs
Normal file
|
@ -0,0 +1,249 @@
|
|||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::{config::Config, MAX_TASKS, RETRIES};
|
||||
use ccutils::{
|
||||
file::async_download,
|
||||
tracing::{debug, error, info, warn},
|
||||
};
|
||||
use lazy_static::lazy_static;
|
||||
use reqwest::Client;
|
||||
use scraper::{Html, Selector};
|
||||
use tokio::{
|
||||
fs,
|
||||
sync::{
|
||||
mpsc::{self, Receiver, Sender},
|
||||
Semaphore,
|
||||
},
|
||||
time::sleep,
|
||||
};
|
||||
|
||||
lazy_static! {
|
||||
/// 任务计数器,用于跟踪任务数量
|
||||
static ref COUNTER: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
|
||||
}
|
||||
|
||||
/// 服务器结构体,用于管理下载任务
|
||||
struct Server {
|
||||
pub task_sender: Sender<Task>,
|
||||
task_recv: Option<Receiver<Task>>,
|
||||
res_sender: Sender<Result<Task, Task>>,
|
||||
res_recv: Option<Receiver<Result<Task, Task>>>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
/// 创建一个新的服务器实例
|
||||
pub fn new() -> Self {
|
||||
let (task_sender, task_recv) = mpsc::channel(MAX_TASKS);
|
||||
let (res_sender, res_recv) = mpsc::channel(100);
|
||||
let task_recv = Some(task_recv);
|
||||
let res_recv = Some(res_recv);
|
||||
|
||||
Server {
|
||||
task_sender,
|
||||
task_recv,
|
||||
res_sender,
|
||||
res_recv,
|
||||
}
|
||||
}
|
||||
|
||||
/// 运行服务器,处理下载任务
|
||||
pub async fn run(&mut self, config: &Config) -> crate::Result<()> {
|
||||
let mut task_recv = self.task_recv.take().unwrap();
|
||||
let mut res_recv = self.res_recv.take().unwrap();
|
||||
|
||||
// 只有 `Server::result` 退出,才是正常,即所有的任务均完成、或多次失败后不再重试
|
||||
tokio::select! {
|
||||
_ = self.websync(config) => {},
|
||||
_ = Server::dispatcher(&self.task_sender, &mut task_recv, &self.res_sender) => {},
|
||||
_ = Server::result(&mut res_recv) => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// 读取配置文件并开始执行访问、下载
|
||||
async fn websync(&self, config: &Config) -> crate::Result<()> {
|
||||
let websyncers = &config.inner;
|
||||
for (task, conf) in websyncers {
|
||||
info!("Start to run {task} sync task...");
|
||||
Server::download_directory(
|
||||
self.task_sender.clone(),
|
||||
&conf.from(),
|
||||
&conf.dest(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
// 这里暂没想好如何处理,总之不能退出
|
||||
sleep(Duration::from_secs(1000000)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 调度任务,将任务分配给下载器
|
||||
async fn dispatcher(
|
||||
task_sender: &Sender<Task>,
|
||||
task_recv: &mut Receiver<Task>,
|
||||
res_sender: &Sender<Result<Task, Task>>,
|
||||
) -> crate::Result<()> {
|
||||
debug!("Server::dispacher starting...");
|
||||
// 初始化一个信号量,使最多同时有 `MAX_TASKS` 个下载任务
|
||||
let semaphore = Arc::new(Semaphore::new(MAX_TASKS));
|
||||
while let Some(mut task) = task_recv.recv().await {
|
||||
// 失败次数超过 `RETRIES` 后即不再尝试下载
|
||||
if task.retries > RETRIES {
|
||||
error!("Retried {RETRIES} times, failed to download {}", task.url);
|
||||
let _ = res_sender.send(Err(task)).await;
|
||||
} else {
|
||||
task.retries += 1;
|
||||
// 只有同时存在的任务数小于 `MAX_TASKS` 时才会开始下载
|
||||
let _semaphore = semaphore.clone();
|
||||
let _permit = _semaphore.acquire().await?;
|
||||
debug!("avaliable task num: {}", _semaphore.available_permits());
|
||||
tokio::spawn(Server::download(
|
||||
task_sender.clone(),
|
||||
res_sender.clone(),
|
||||
task,
|
||||
));
|
||||
// 每次连接间隔 100 ms
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_directory(
|
||||
task_sender: Sender<Task>,
|
||||
url: &str,
|
||||
path: &str,
|
||||
) -> crate::Result<()> {
|
||||
let client = Client::new();
|
||||
let response = client.get(url).send().await?.text().await?;
|
||||
let document = Html::parse_document(&response);
|
||||
let selector = Selector::parse("a").unwrap();
|
||||
|
||||
fs::create_dir_all(path).await?;
|
||||
debug!("Create local directory: {path}");
|
||||
for element in document.select(&selector) {
|
||||
if let Some(href) = element.value().attr("href") {
|
||||
if href.starts_with("../") {
|
||||
continue;
|
||||
}
|
||||
// 处理目录的情况
|
||||
if href.ends_with('/') {
|
||||
let new_url = format!("{}/{}", url.trim_end_matches('/'), href);
|
||||
debug!("Found directory: {new_url}");
|
||||
let new_path = format!("{}/{}", path.trim_end_matches('/'), href);
|
||||
// 因为是递归,所以需要 `pin` 起来
|
||||
let future = Box::pin(Server::download_directory(
|
||||
task_sender.clone(),
|
||||
&new_url,
|
||||
&new_path,
|
||||
));
|
||||
future.await?;
|
||||
} else {
|
||||
// 若是文件,则直接下载
|
||||
let file_url = format!("{}/{}", url.trim_end_matches('/'), href);
|
||||
let file_path = format!("{}/{}", path.trim_end_matches('/'), href);
|
||||
let task = Task::new(file_url, file_path);
|
||||
// 此处使用 COUNTER 计数,即分发下载的任务计数
|
||||
{
|
||||
let mut counter = COUNTER.lock().unwrap();
|
||||
*counter += 1;
|
||||
}
|
||||
task_sender.send(task).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 下载任务,处理单个文件的下载
|
||||
async fn download(
|
||||
task_sender: Sender<Task>,
|
||||
res_sender: Sender<Result<Task, Task>>,
|
||||
task: Task,
|
||||
) -> crate::Result<()> {
|
||||
match async_download(task.url.clone(), Some(task.target.clone())).await {
|
||||
// 若下载成功,则发送至 res channel。
|
||||
Ok(()) => {
|
||||
debug!("Successed synced {}", task.url);
|
||||
match res_sender.send(Ok(task)).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("Send task failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
// 如果下载失败,则发回 dispatcher 进行重试
|
||||
Err(e) => {
|
||||
warn!("Failed to download {}: {}", task.url, e);
|
||||
match task_sender.send(task).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("Send task res failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn result(res_recv: &mut Receiver<Result<Task, Task>>) -> crate::Result<()> {
|
||||
debug!("Server::result() is running...");
|
||||
let mut successed = vec![];
|
||||
let mut failed = vec![];
|
||||
while let Some(res) = res_recv.recv().await {
|
||||
match res {
|
||||
Ok(task) => {
|
||||
successed.push(task);
|
||||
}
|
||||
Err(task) => failed.push(task),
|
||||
}
|
||||
|
||||
// 如果所有的下载任务已完成,则返回并退出
|
||||
let counter = COUNTER.lock().unwrap();
|
||||
if *counter == successed.len() + failed.len() {
|
||||
info!(
|
||||
"All {} tasks done, successed {}, failed {}.",
|
||||
counter,
|
||||
successed.len(),
|
||||
failed.len()
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct Task {
|
||||
pub url: String,
|
||||
pub target: String,
|
||||
pub retries: usize,
|
||||
}
|
||||
|
||||
impl Task {
|
||||
pub fn new(url: String, target: String) -> Self {
|
||||
let retries = 0;
|
||||
Task {
|
||||
url,
|
||||
target,
|
||||
retries,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 开始从配置 `config` 里执行同步任务
|
||||
pub(crate) async fn run(config: &Config) -> crate::Result<()> {
|
||||
info!("CUWEB-SYNCER starting...");
|
||||
let mut server = Server::new();
|
||||
server.run(config).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue
Block a user