Compare commits
5 Commits
0e55223973
...
a238cc5fcc
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a238cc5fcc | ||
![]() |
c614e68280 | ||
![]() |
1bf5868fce | ||
![]() |
6020ca43ba | ||
![]() |
6130cfb724 |
|
@ -6,3 +6,11 @@ edition = "2021"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
ccutils = { git = "https://git.zhgsun.com:8089/jiachao2130/ccutils.git", version = "0.1.0" }
|
||||||
|
clap = { version = "4.0", features = ["derive"] }
|
||||||
|
lazy_static = { version = "1.5" }
|
||||||
|
reqwest = { version = "0.12", features = ["stream"] }
|
||||||
|
scraper = { version = "0.19" }
|
||||||
|
serde = { version = "1", features = ["serde_derive"] }
|
||||||
|
tokio = { version = "1.38" }
|
||||||
|
toml = { version = "0.8" }
|
||||||
|
|
31
src/cli.rs
Normal file
31
src/cli.rs
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
use clap::Parser;
|
||||||
|
|
||||||
|
/// cuweb-syncer 是一个用于从远程 web 服务器同步文件至本地的命令行工具。
|
||||||
|
#[derive(Clone, Debug, Parser)]
|
||||||
|
#[command(author, version, about, long_about = None)]
|
||||||
|
pub(crate) struct Cli {
|
||||||
|
/// 在终端输出当前的配置文件内容
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub config: bool,
|
||||||
|
|
||||||
|
/// 是否开启 debug 模式,将打印更详尽的日志信息
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub debug: bool,
|
||||||
|
|
||||||
|
/// 静默模式,如任务运行正常则无任何信息输出
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub quiet: bool,
|
||||||
|
|
||||||
|
/// 可选项,需配合 `dest` 参数使用,只运行一次指定的同步任务,指定服务器地址
|
||||||
|
#[arg(short, long)]
|
||||||
|
pub from: Option<String>,
|
||||||
|
|
||||||
|
/// 可选项,需配合 `from` 参数使用,只运行一次指定的同步任务,指定同步的本地路径
|
||||||
|
#[arg(short, long)]
|
||||||
|
pub dest: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 从命令行环境变量读取并转换为 `Cli`
|
||||||
|
pub(crate) fn parse() -> Cli {
|
||||||
|
Cli::parse()
|
||||||
|
}
|
141
src/config.rs
Normal file
141
src/config.rs
Normal file
|
@ -0,0 +1,141 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::fs;
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Config 结构体包含一个 HashMap,其中键是 String 类型,值是 WebSycnConf 类型。
|
||||||
|
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
|
||||||
|
pub(crate) struct Config {
|
||||||
|
pub inner: HashMap<String, WebSycnConf>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
impl Config {
|
||||||
|
/// 创建一个新的 Config 实例。
|
||||||
|
///
|
||||||
|
/// # 返回值
|
||||||
|
/// 返回一个包含空 HashMap 的 Config 实例。
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 从指定路径加载配置文件。
|
||||||
|
///
|
||||||
|
/// 该方法尝试从给定路径读取文件,并解析 TOML 格式的数据。
|
||||||
|
///
|
||||||
|
/// # 参数
|
||||||
|
/// - `path`: 配置文件的路径。
|
||||||
|
///
|
||||||
|
/// # 返回值
|
||||||
|
/// 返回一个 `Result`,成功时包含 `Config` 实例,失败时包含错误信息。
|
||||||
|
pub fn load(path: &str) -> crate::Result<Self> {
|
||||||
|
// 读取文件内容
|
||||||
|
let data = std::fs::read_to_string(path)?;
|
||||||
|
|
||||||
|
// 解析 TOML 数据
|
||||||
|
match toml::from_str::<Config>(&data) {
|
||||||
|
Ok(config) => Ok(config),
|
||||||
|
Err(e) => Err(Box::new(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 将当前配置保存到指定路径。
|
||||||
|
///
|
||||||
|
/// 该方法将当前配置序列化为 TOML 格式,并写入到指定路径的文件中。
|
||||||
|
///
|
||||||
|
/// # 参数
|
||||||
|
/// - `path`: 配置文件的保存路径。
|
||||||
|
///
|
||||||
|
/// # 返回值
|
||||||
|
/// 返回一个 `Result`,成功时返回 `Ok(())`,失败时返回错误信息。
|
||||||
|
pub fn save(&self, path: &str) -> crate::Result<()> {
|
||||||
|
// 将配置序列化为 TOML 格式的字符串
|
||||||
|
let data = toml::to_string_pretty(&self)?;
|
||||||
|
|
||||||
|
// 打开文件并写入数据
|
||||||
|
let mut config = fs::OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(true)
|
||||||
|
.create(true)
|
||||||
|
.open(path)?;
|
||||||
|
config.write_all(data.as_bytes())?;
|
||||||
|
config.flush()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// WebSycnConf 结构体表示一个同步配置,包括源路径和目标路径。
|
||||||
|
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
|
||||||
|
pub(crate) struct WebSycnConf {
|
||||||
|
/// 源路径
|
||||||
|
from: String,
|
||||||
|
/// 目标路径
|
||||||
|
dest: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
impl WebSycnConf {
|
||||||
|
/// 创建一个新的 WebSycnConf 实例。
|
||||||
|
///
|
||||||
|
/// # 参数
|
||||||
|
/// - `from`: 源路径
|
||||||
|
/// - `dest`: 目标路径
|
||||||
|
///
|
||||||
|
/// # 返回值
|
||||||
|
/// 返回一个新的 WebSycnConf 实例。
|
||||||
|
pub fn new(from: String, dest: String) -> Self {
|
||||||
|
WebSycnConf { from, dest }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 获取源路径。
|
||||||
|
///
|
||||||
|
/// # 返回值
|
||||||
|
/// 返回源路径的引用。
|
||||||
|
pub fn from(&self) -> &str {
|
||||||
|
&self.from
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 获取目标路径。
|
||||||
|
///
|
||||||
|
/// # 返回值
|
||||||
|
/// 返回目标路径的引用。
|
||||||
|
pub fn dest(&self) -> &str {
|
||||||
|
&self.dest
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn config_test() {
|
||||||
|
let cvrfsyncer = WebSycnConf::new(
|
||||||
|
"http://mirrors.ustc.edu.cn/openeuler/security/data/cvrf/".to_string(),
|
||||||
|
"./test/cvrfs".to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let srpmsyncer = WebSycnConf::new(
|
||||||
|
"http://mirrors.ustc.edu.cn/culinux/3.0/source/".to_string(),
|
||||||
|
"./test/srpms".to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut config = Config::new();
|
||||||
|
config.inner.insert("cvrf".to_string(), cvrfsyncer);
|
||||||
|
config.inner.insert("srpms".to_string(), srpmsyncer);
|
||||||
|
|
||||||
|
let conf_file = "./test/cuwebsyncer.toml";
|
||||||
|
// 首先测试写入配置文件
|
||||||
|
match config.save(conf_file) {
|
||||||
|
Ok(()) => assert!(true),
|
||||||
|
Err(e) => assert_eq!("", format!("{e}")),
|
||||||
|
}
|
||||||
|
|
||||||
|
// 再测试读取
|
||||||
|
let new_config = Config::load(conf_file).unwrap();
|
||||||
|
|
||||||
|
// 两个结果相等
|
||||||
|
assert_eq!(config, new_config);
|
||||||
|
|
||||||
|
// 删除测试用文件
|
||||||
|
std::fs::remove_file(conf_file).unwrap();
|
||||||
|
}
|
52
src/lib.rs
Normal file
52
src/lib.rs
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
pub use ccutils::Result;
|
||||||
|
|
||||||
|
mod cli;
|
||||||
|
mod config;
|
||||||
|
use config::{Config, WebSycnConf};
|
||||||
|
mod syncer;
|
||||||
|
|
||||||
|
pub const MAX_TASKS: usize = 10;
|
||||||
|
pub const RETRIES: usize = 5;
|
||||||
|
pub const CONF: &str = "/etc/cuvars/cuweb-syncer";
|
||||||
|
|
||||||
|
pub fn cumain() -> Result<()> {
|
||||||
|
let cli = cli::parse();
|
||||||
|
|
||||||
|
// 设置日志级别
|
||||||
|
match (cli.debug, cli.quiet) {
|
||||||
|
(true, _) => std::env::set_var("RUST_LOG", "debug"),
|
||||||
|
(false, true) => std::env::set_var("RUST_LOG", "error"),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
ccutils::set_up_logging()?;
|
||||||
|
|
||||||
|
let config = {
|
||||||
|
if cli.from.is_some() && cli.dest.is_some() {
|
||||||
|
let mut config = Config::new();
|
||||||
|
let websync_conf =
|
||||||
|
WebSycnConf::new(cli.from.clone().unwrap(), cli.dest.clone().unwrap());
|
||||||
|
config.inner.insert("Task".to_string(), websync_conf);
|
||||||
|
config
|
||||||
|
} else {
|
||||||
|
let default = Path::new(CONF);
|
||||||
|
let conf = if default.is_file() {
|
||||||
|
CONF
|
||||||
|
} else {
|
||||||
|
"cuweb-syncer"
|
||||||
|
};
|
||||||
|
match Config::load(conf) {
|
||||||
|
Ok(conf) => conf,
|
||||||
|
Err(e) => {
|
||||||
|
panic!("Failed to load {conf}: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let rt = ccutils::async_runtime()?;
|
||||||
|
|
||||||
|
let _ = rt.block_on(syncer::run(&config))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,3 +1,3 @@
|
||||||
fn main() {
|
fn main() -> cuweb_syncer::Result<()> {
|
||||||
println!("Hello, world!");
|
cuweb_syncer::cumain()
|
||||||
}
|
}
|
||||||
|
|
249
src/syncer.rs
Normal file
249
src/syncer.rs
Normal file
|
@ -0,0 +1,249 @@
|
||||||
|
use std::{
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{config::Config, MAX_TASKS, RETRIES};
|
||||||
|
use ccutils::{
|
||||||
|
file::async_download,
|
||||||
|
tracing::{debug, error, info, warn},
|
||||||
|
};
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use reqwest::Client;
|
||||||
|
use scraper::{Html, Selector};
|
||||||
|
use tokio::{
|
||||||
|
fs,
|
||||||
|
sync::{
|
||||||
|
mpsc::{self, Receiver, Sender},
|
||||||
|
Semaphore,
|
||||||
|
},
|
||||||
|
time::sleep,
|
||||||
|
};
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
/// 任务计数器,用于跟踪任务数量
|
||||||
|
static ref COUNTER: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 服务器结构体,用于管理下载任务
|
||||||
|
struct Server {
|
||||||
|
pub task_sender: Sender<Task>,
|
||||||
|
task_recv: Option<Receiver<Task>>,
|
||||||
|
res_sender: Sender<Result<Task, Task>>,
|
||||||
|
res_recv: Option<Receiver<Result<Task, Task>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Server {
|
||||||
|
/// 创建一个新的服务器实例
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let (task_sender, task_recv) = mpsc::channel(MAX_TASKS);
|
||||||
|
let (res_sender, res_recv) = mpsc::channel(100);
|
||||||
|
let task_recv = Some(task_recv);
|
||||||
|
let res_recv = Some(res_recv);
|
||||||
|
|
||||||
|
Server {
|
||||||
|
task_sender,
|
||||||
|
task_recv,
|
||||||
|
res_sender,
|
||||||
|
res_recv,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 运行服务器,处理下载任务
|
||||||
|
pub async fn run(&mut self, config: &Config) -> crate::Result<()> {
|
||||||
|
let mut task_recv = self.task_recv.take().unwrap();
|
||||||
|
let mut res_recv = self.res_recv.take().unwrap();
|
||||||
|
|
||||||
|
// 只有 `Server::result` 退出,才是正常,即所有的任务均完成、或多次失败后不再重试
|
||||||
|
tokio::select! {
|
||||||
|
_ = self.websync(config) => {},
|
||||||
|
_ = Server::dispatcher(&self.task_sender, &mut task_recv, &self.res_sender) => {},
|
||||||
|
_ = Server::result(&mut res_recv) => {}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取配置文件并开始执行访问、下载
|
||||||
|
async fn websync(&self, config: &Config) -> crate::Result<()> {
|
||||||
|
let websyncers = &config.inner;
|
||||||
|
for (task, conf) in websyncers {
|
||||||
|
info!("Start to run {task} sync task...");
|
||||||
|
Server::download_directory(
|
||||||
|
self.task_sender.clone(),
|
||||||
|
&conf.from(),
|
||||||
|
&conf.dest(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
// 这里暂没想好如何处理,总之不能退出
|
||||||
|
sleep(Duration::from_secs(1000000)).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 调度任务,将任务分配给下载器
|
||||||
|
async fn dispatcher(
|
||||||
|
task_sender: &Sender<Task>,
|
||||||
|
task_recv: &mut Receiver<Task>,
|
||||||
|
res_sender: &Sender<Result<Task, Task>>,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
debug!("Server::dispacher starting...");
|
||||||
|
// 初始化一个信号量,使最多同时有 `MAX_TASKS` 个下载任务
|
||||||
|
let semaphore = Arc::new(Semaphore::new(MAX_TASKS));
|
||||||
|
while let Some(mut task) = task_recv.recv().await {
|
||||||
|
// 失败次数超过 `RETRIES` 后即不再尝试下载
|
||||||
|
if task.retries > RETRIES {
|
||||||
|
error!("Retried {RETRIES} times, failed to download {}", task.url);
|
||||||
|
let _ = res_sender.send(Err(task)).await;
|
||||||
|
} else {
|
||||||
|
task.retries += 1;
|
||||||
|
// 只有同时存在的任务数小于 `MAX_TASKS` 时才会开始下载
|
||||||
|
let _semaphore = semaphore.clone();
|
||||||
|
let _permit = _semaphore.acquire().await?;
|
||||||
|
debug!("avaliable task num: {}", _semaphore.available_permits());
|
||||||
|
tokio::spawn(Server::download(
|
||||||
|
task_sender.clone(),
|
||||||
|
res_sender.clone(),
|
||||||
|
task,
|
||||||
|
));
|
||||||
|
// 每次连接间隔 100 ms
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn download_directory(
|
||||||
|
task_sender: Sender<Task>,
|
||||||
|
url: &str,
|
||||||
|
path: &str,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
let client = Client::new();
|
||||||
|
let response = client.get(url).send().await?.text().await?;
|
||||||
|
let document = Html::parse_document(&response);
|
||||||
|
let selector = Selector::parse("a").unwrap();
|
||||||
|
|
||||||
|
fs::create_dir_all(path).await?;
|
||||||
|
debug!("Create local directory: {path}");
|
||||||
|
for element in document.select(&selector) {
|
||||||
|
if let Some(href) = element.value().attr("href") {
|
||||||
|
if href.starts_with("../") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// 处理目录的情况
|
||||||
|
if href.ends_with('/') {
|
||||||
|
let new_url = format!("{}/{}", url.trim_end_matches('/'), href);
|
||||||
|
debug!("Found directory: {new_url}");
|
||||||
|
let new_path = format!("{}/{}", path.trim_end_matches('/'), href);
|
||||||
|
// 因为是递归,所以需要 `pin` 起来
|
||||||
|
let future = Box::pin(Server::download_directory(
|
||||||
|
task_sender.clone(),
|
||||||
|
&new_url,
|
||||||
|
&new_path,
|
||||||
|
));
|
||||||
|
future.await?;
|
||||||
|
} else {
|
||||||
|
// 若是文件,则直接下载
|
||||||
|
let file_url = format!("{}/{}", url.trim_end_matches('/'), href);
|
||||||
|
let file_path = format!("{}/{}", path.trim_end_matches('/'), href);
|
||||||
|
let task = Task::new(file_url, file_path);
|
||||||
|
// 此处使用 COUNTER 计数,即分发下载的任务计数
|
||||||
|
{
|
||||||
|
let mut counter = COUNTER.lock().unwrap();
|
||||||
|
*counter += 1;
|
||||||
|
}
|
||||||
|
task_sender.send(task).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 下载任务,处理单个文件的下载
|
||||||
|
async fn download(
|
||||||
|
task_sender: Sender<Task>,
|
||||||
|
res_sender: Sender<Result<Task, Task>>,
|
||||||
|
task: Task,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
match async_download(task.url.clone(), Some(task.target.clone())).await {
|
||||||
|
// 若下载成功,则发送至 res channel。
|
||||||
|
Ok(()) => {
|
||||||
|
debug!("Successed synced {}", task.url);
|
||||||
|
match res_sender.send(Ok(task)).await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Send task failed: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 如果下载失败,则发回 dispatcher 进行重试
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to download {}: {}", task.url, e);
|
||||||
|
match task_sender.send(task).await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Send task res failed: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn result(res_recv: &mut Receiver<Result<Task, Task>>) -> crate::Result<()> {
|
||||||
|
debug!("Server::result() is running...");
|
||||||
|
let mut successed = vec![];
|
||||||
|
let mut failed = vec![];
|
||||||
|
while let Some(res) = res_recv.recv().await {
|
||||||
|
match res {
|
||||||
|
Ok(task) => {
|
||||||
|
successed.push(task);
|
||||||
|
}
|
||||||
|
Err(task) => failed.push(task),
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果所有的下载任务已完成,则返回并退出
|
||||||
|
let counter = COUNTER.lock().unwrap();
|
||||||
|
if *counter == successed.len() + failed.len() {
|
||||||
|
info!(
|
||||||
|
"All {} tasks done, successed {}, failed {}.",
|
||||||
|
counter,
|
||||||
|
successed.len(),
|
||||||
|
failed.len()
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Task {
|
||||||
|
pub url: String,
|
||||||
|
pub target: String,
|
||||||
|
pub retries: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Task {
|
||||||
|
pub fn new(url: String, target: String) -> Self {
|
||||||
|
let retries = 0;
|
||||||
|
Task {
|
||||||
|
url,
|
||||||
|
target,
|
||||||
|
retries,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 开始从配置 `config` 里执行同步任务
|
||||||
|
pub(crate) async fn run(config: &Config) -> crate::Result<()> {
|
||||||
|
info!("CUWEB-SYNCER starting...");
|
||||||
|
let mut server = Server::new();
|
||||||
|
server.run(config).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user