#coding=utf-8
#################################################
#脚本功能:ssh方式实现自动备份,需要在sync_config.ini配置文件中配置参数
#[Paths]
# local_dir = d:/BaiduSyncdisk/data #本地备份目录,注意路径格式只能使用/
# remote_dir = /data/data #远程目录
#
# [Connection]
# remote_host = 127.0.0.1 #远程连接IP
# remote_user =data
# ssh_port = 33322
# ssh_key = ~/.ssh/id_ed25519
#
# [Scan]
# days = 30 #文件扫描天数,只扫描近30有修改过的文件,避免文件数量过多之后出现太长耗时,第一次运行手动修改此时间
# skip_today = True #是否跳过当天的文件,默认跳过,可以避免出现文件还在拷贝或者写入过程就执行文件拷贝了
#
#
#脚本编写人与日期: zsm 20250630
##################################################
import os
import subprocess
import configparser
import logging
from datetime import datetime, timedelta
import sqlite3
from contextlib import contextmanager
import time
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='sync.log'
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
# 检查日志文件大小,超过1GB则清空
LOG_MAX_SIZE = 1 * 1024 * 1024 * 1024 # 1GB
if os.path.exists('sync.log'):
log_size = os.path.getsize('sync.log')
if log_size > LOG_MAX_SIZE:
logging.info(f"日志文件大小超过1GB,正在清空...")
with open('sync.log', 'w') as f:
f.write("日志文件因大小超过1GB被清空\n")
class RemoteFileSync:
def __init__(self):
self.start_time = time.time() # 记录开始时间
self.config = self._load_config()
self.local_dir = self.config.get('Paths', 'local_dir')
self.remote_dir = self.config.get('Paths', 'remote_dir')
self.remote_host = self.config.get('Connection', 'remote_host')
self.remote_user = self.config.get('Connection', 'remote_user')
self.ssh_port = self.config.getint('Connection', 'ssh_port', fallback=22)
self.ssh_key = self.config.get('Connection', 'ssh_key', fallback=None)
self.scan_days = self.config.getint('Scan', 'days', fallback=30)
self.skip_today = self.config.getboolean('Scan', 'skip_today', fallback=True) # 新增配置项
# 数据库路径
self.db_path = os.path.join(self.local_dir, 'file_info.db')
# 确保本地目录存在
os.makedirs(self.local_dir, exist_ok=True)
# 初始化数据库
self._init_db()
def _load_config(self):
"""加载配置文件"""
config = configparser.ConfigParser()
config_path = 'sync_config.ini'
if not os.path.exists(config_path):
# 创建默认配置文件
config['Paths'] = {
'local_dir': 'd:/BaiduSyncdisk/data',
'remote_dir': '/data/data'
}
config['Connection'] = {
'remote_host': 'your_server_ip',
'remote_user': 'your_username',
'ssh_port': 22,
'ssh_key': '/path/to/your/private_key'
}
config['Scan'] = {
'days': 30, # 默认扫描30天内的文件
'skip_today': 'True' # 新增:是否跳过当日修改的文件
}
with open(config_path, 'w') as f:
config.write(f)
logging.info(f"已创建配置文件: {config_path},请编辑配置")
exit(1)
config.read(config_path)
return config
def _init_db(self):
"""初始化SQLite数据库"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
# 创建文件信息表(确保表名和字段名不包含特殊字符)
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_info (
path TEXT PRIMARY KEY,
mtime INTEGER NOT NULL,
last_checked INTEGER NOT NULL,
is_today INTEGER DEFAULT 0
)
''')
# 创建索引以加速查询(确保索引名不包含特殊字符)
cursor.execute('CREATE INDEX IF NOT EXISTS idx_mtime ON file_info (mtime)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_last_checked ON file_info (last_checked)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_is_today ON file_info (is_today)')
conn.commit()
@contextmanager
def _get_db_connection(self):
"""获取数据库连接,使用上下文管理器确保资源正确关闭"""
conn = sqlite3.connect(self.db_path)
# 启用外键约束(如果需要)
conn.execute("PRAGMA foreign_keys = ON")
# 设置文本编码为UTF-8
conn.text_factory = str
try:
yield conn
finally:
conn.close()
def _run_ssh_command(self, command):
"""执行SSH命令"""
base_cmd = ['ssh']
# 添加SSH端口
base_cmd.extend(['-p', str(self.ssh_port)])
# 添加SSH密钥(如果有)
if self.ssh_key:
base_cmd.extend(['-i', self.ssh_key])
# 添加远程用户和主机
base_cmd.append(f"{self.remote_user}@{self.remote_host}")
# 添加要执行的命令
base_cmd.append(command)
try:
# 执行命令并捕获二进制输出
result = subprocess.run(
base_cmd,
capture_output=True,
check=True
)
# 手动使用UTF-8解码输出
stdout = result.stdout.decode('utf-8', errors='replace')
return stdout.strip()
except subprocess.CalledProcessError as e:
# 手动解码错误输出
stderr = e.stderr.decode('utf-8', errors='replace')
logging.error(f"SSH命令执行失败: {stderr}")
raise
except UnicodeDecodeError as e:
logging.error(f"解码SSH输出失败: {e}")
return ""
def _run_scp_command(self, source, destination):
"""执行SCP命令"""
base_cmd = ['scp']
# 添加SSH端口
base_cmd.extend(['-P', str(self.ssh_port)])
# 添加SSH密钥(如果有)
if self.ssh_key:
base_cmd.extend(['-i', self.ssh_key])
# 添加源和目标
base_cmd.extend([source.replace("\"", "").replace("\'", ""), destination])
try:
# 执行命令并捕获二进制输出
result = subprocess.run(
base_cmd,
capture_output=True,
check=True
)
# 手动使用UTF-8解码输出
stdout = result.stdout.decode('utf-8', errors='replace')
logging.info(f"SCP命令输出: {stdout}")
logging.info(f"文件传输成功: {source} -> {destination}")
return True
except subprocess.CalledProcessError as e:
# 手动解码错误输出
stderr = e.stderr.decode('utf-8', errors='replace')
logging.error(f"SCP命令执行失败: {stderr}")
return False
def get_remote_file_info(self):
"""获取远程服务器文件信息并保存到数据库"""
logging.info(f"正在获取远程服务器文件信息...")
# 计算时间范围
today_start = int(datetime.combine(datetime.today(), datetime.min.time()).timestamp())
today_end = int(datetime.combine(datetime.today(), datetime.max.time()).timestamp())
cutoff_time = int((datetime.now() - timedelta(days=self.scan_days)).timestamp())
# 构建获取文件信息的命令
find_filters = []
# 添加时间范围过滤
find_filters.append(f'-newermt "$(date -d @{cutoff_time} +\'%Y-%m-%d %H:%M:%S\')"')
# 添加跳过当日文件的过滤
if self.skip_today:
find_filters.append(f'-not -newermt "$(date -d @{today_start} +\'%Y-%m-%d %H:%M:%S\')"')
find_filters.append(f'-not -newerct "$(date -d @{today_end} +\'%Y-%m-%d %H:%M:%S\')"')
find_filters_str = ' '.join(find_filters)
find_cmd = (f'find "{self.remote_dir}" -type f {find_filters_str} '
f'-exec sh -c \'for file; do printf "%s #|# %s\n" "$file" "$(stat -c %Y "$file")"; done\' sh {{}} +')
try:
# 执行命令获取文件信息
file_info_text = self._run_ssh_command(find_cmd)
# 解析文件信息并更新数据库
self._update_db_with_file_info(file_info_text, today_start, today_end)
# 从数据库获取所有文件信息,根据skip_today决定是否包含当日文件
with self._get_db_connection() as conn:
cursor = conn.cursor()
if self.skip_today:
cursor.execute("SELECT path, mtime FROM file_info WHERE is_today = 0")
else:
cursor.execute("SELECT path, mtime FROM file_info")
return {row[0]: row[1] for row in cursor.fetchall()}
except Exception as e:
logging.error(f"获取远程文件信息失败: {e}")
return {}
def _update_db_with_file_info(self, file_info_text, today_start, today_end):
"""将文件信息更新到数据库"""
current_time = int(datetime.now().timestamp())
with self._get_db_connection() as conn:
cursor = conn.cursor()
# 开始事务以提高性能
conn.execute("BEGIN TRANSACTION")
for line in file_info_text.strip().split('\n'):
if not line:
continue
try:
parts = line.rsplit(' #|# ', 1)
if len(parts) != 2:
continue
file_path, mtime = parts
mtime = int(mtime)
# 转义文件路径中的特殊字符,特别是#号
file_path = file_path.replace('#', '\\#')
# 判断是否为当日修改的文件
is_today = 1 if today_start <= mtime <= today_end else 0
# 插入或更新文件信息(使用参数化查询避免SQL注入)
cursor.execute(
"""
INSERT OR REPLACE INTO file_info (path, mtime, last_checked, is_today)
VALUES (?, ?, ?, ?)
""",
(file_path, mtime, current_time, is_today)
)
except Exception as e:
logging.warning(f"处理文件信息失败: {line}, 错误: {e}")
# 提交事务
conn.commit()
# 清理超过扫描期限且未更新的文件(使用参数化查询)
cutoff_time = int((datetime.now() - timedelta(days=self.scan_days)).timestamp())
cursor.execute(
"""
DELETE FROM file_info
WHERE mtime < ? AND last_checked < ?
""",
(cutoff_time, current_time)
)
logging.info(f"已更新数据库中的文件信息,跳过当日修改文件: {self.skip_today}")
def get_local_file_info(self):
"""获取本地文件信息"""
logging.info("正在获取本地文件信息...")
file_info = {}
for root, _, files in os.walk(self.local_dir):
for file in files:
# 跳过数据库文件本身
if file == os.path.basename(self.db_path):
continue
file_path = os.path.join(root, file)
# 计算相对路径
rel_path = os.path.relpath(file_path, self.local_dir)
# 构建对应的远程路径(转义#号)
remote_file_path = os.path.join(
self.remote_dir,
rel_path.replace('\\', '/') # 确保远程路径使用正斜杠
).replace('\\', '/').replace('#', '\\#')
# 获取文件修改时间
try:
mtime = int(os.path.getmtime(file_path))
file_info[remote_file_path] = mtime
except Exception as e:
logging.warning(f"获取本地文件信息失败: {file_path}, 错误: {e}")
return file_info
def sync_files(self):
"""同步文件(仅从远程到本地)"""
logging.info("开始同步文件...")
# 获取远程和本地文件信息
remote_files = self.get_remote_file_info()
local_files = self.get_local_file_info()
# 用于跟踪需要更新的文件信息
updated_files = {}
# 同步远程有但本地没有的文件
for remote_path in list(remote_files.keys()):
if remote_path not in local_files:
logging.info(f"检测到新文件: {remote_path}")
success = self._sync_from_remote(remote_path)
if success:
# 更新本地文件信息
local_mtime = int(os.path.getmtime(
os.path.join(self.local_dir, os.path.relpath(remote_path, self.remote_dir))
))
local_files[remote_path] = local_mtime
updated_files[remote_path] = local_mtime
else:
remote_files.pop(remote_path, None)
# 比较修改时间,同步更新的文件
for remote_path in set(remote_files.keys()) & set(local_files.keys()):
remote_mtime = remote_files[remote_path]
local_mtime = local_files[remote_path]
# 增加调试日志
logging.debug(f"比较文件: {remote_path}")
logging.debug(f" 远程时间戳: {remote_mtime} ({datetime.fromtimestamp(remote_mtime)})")
logging.debug(f" 本地时间戳: {local_mtime} ({datetime.fromtimestamp(local_mtime)})")
# 使用容差值比较时间戳,解决精度差异问题
time_diff = remote_mtime - local_mtime
TIME_TOLERANCE = 2 # 允许2秒的误差,可根据需要调整
if time_diff > TIME_TOLERANCE:
logging.info(f"远程文件更新: {remote_path} (差异: {time_diff}秒)")
success = self._sync_from_remote(remote_path)
if success:
# 更新本地文件信息
local_mtime = int(os.path.getmtime(
os.path.join(self.local_dir, os.path.relpath(remote_path, self.remote_dir))
))
local_files[remote_path] = local_mtime
updated_files[remote_path] = local_mtime
logging.info(f"更新本地时间戳: {local_mtime}")
else:
remote_files.pop(remote_path, None)
else:
logging.debug(f"文件时间戳一致: {remote_path} (差异: {time_diff}秒)")
# 验证同步结果
self._verify_sync_results(local_files, remote_files)
# 计算并记录运行时长
run_time = time.time() - self.start_time
hours, remainder = divmod(run_time, 3600)
minutes, seconds = divmod(remainder, 60)
logging.info(f"脚本运行完成,总耗时: {int(hours)}小时{int(minutes)}分钟{seconds:.2f}秒")
def _sync_from_remote(self, remote_path):
"""从远程同步文件到本地"""
# 构建本地路径(反转义#号)
local_remote_path = remote_path.replace('\\#', '#')
rel_path = os.path.relpath(local_remote_path, self.remote_dir)
local_path = os.path.join(self.local_dir, rel_path)
# 确保本地目录存在
local_dir = os.path.dirname(local_path)
os.makedirs(local_dir, exist_ok=True)
logging.info(f"从远程同步: {local_remote_path} -> {local_path}")
# 使用更健壮的路径转义方法
escaped_remote_path = self._escape_path(local_remote_path)
# 使用双引号包裹整个路径,内部使用单引号保护
source = f"{self.remote_user}@{self.remote_host}:\"'{escaped_remote_path}'\""
destination = local_path
logging.info(f"执行SCP命令: {' '.join(['scp', '-P', str(self.ssh_port), source, destination])}")
# 执行SCP命令并获取结果
success = self._run_scp_command(source, destination)
# 如果失败,记录详细的调试信息
if not success:
logging.error(f"同步失败: {local_remote_path}")
logging.error(f"转义后的远程路径: {escaped_remote_path}")
logging.error(f"SCP源路径: {source}")
return False
# 验证下载后的文件时间戳
try:
downloaded_mtime = int(os.path.getmtime(local_path))
logging.info(f"下载后文件时间戳: {downloaded_mtime} ({datetime.fromtimestamp(downloaded_mtime)})")
except Exception as e:
logging.error(f"获取下载后文件时间戳失败: {e}")
return False
return True
def _escape_path(self, path):
"""更健壮的路径转义方法,处理所有特殊字符"""
# 转义单引号 - 这是最关键的特殊字符
escaped = path.replace("'", "'\\''")
# 转义#号
escaped = escaped.replace('#', '\\#')
# 对于包含空格的路径,额外添加保护
if ' ' in escaped:
# 在路径两边添加单引号,并用双引号包裹整个路径
escaped = f"'{escaped}'"
return escaped
def _verify_sync_results(self, local_files, remote_files):
"""验证同步结果"""
logging.info("验证同步结果...")
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT path, mtime, is_today FROM file_info")
db_files = {row[0]: (row[1], row[2]) for row in cursor.fetchall()}
for remote_path in set(local_files.keys()) | set(remote_files.keys()):
local_time = local_files.get(remote_path, None)
remote_time = remote_files.get(remote_path, None)
if remote_path in db_files:
db_time, is_today = db_files[remote_path]
else:
db_time, is_today = None, None
# 反转义路径以便显示
display_path = remote_path.replace('\\#', '#')
if local_time is not None and remote_time is not None:
time_diff = local_time - remote_time
if abs(time_diff) > 100: # 允许2秒的误差
logging.warning(f"时间戳冲突,本地文件日期更新: {display_path}")
logging.warning(f" 本地: {local_time} ({datetime.fromtimestamp(local_time)})")
logging.warning(f" 远程: {remote_time} ({datetime.fromtimestamp(remote_time)})")
logging.warning(f" 差异: {time_diff}秒")
elif local_time is None and remote_time is not None:
if is_today == 1:
if self.skip_today:
logging.info(f"跳过当日修改文件: {display_path}")
else:
logging.warning(f"本地缺少当日修改文件: {display_path}")
else:
logging.warning(f"本地缺少文件: {display_path}")
elif local_time is not None and remote_time is None:
if db_time is not None:
if is_today == 1:
if self.skip_today:
logging.info(f"远程文件为当日修改,未扫描: {display_path}")
else:
logging.warning(f"远程当日修改文件未同步: {display_path}")
else:
logging.info(f"文件已超过扫描期限: {display_path}")
if __name__ == "__main__":
start_time = time.time()
try:
sync_tool = RemoteFileSync()
sync_tool.sync_files()
except Exception as e:
logging.error(f"同步过程中发生错误: {e}")
import traceback
logging.error(traceback.format_exc())
finally:
# 无论是否出错,都记录总运行时间
total_run_time = time.time() - start_time
hours, remainder = divmod(total_run_time, 3600)
minutes, seconds = divmod(remainder, 60)
logging.info(f"脚本总运行时间: {int(hours)}小时{int(minutes)}分钟{seconds:.2f}秒")
本文地址:https://zhaoshuman.cn/%E6%8A%80%E6%9C%AF%E5%88%86%E4%BA%AB/8.html
免责声明:本文为原创文章,版权归 zhaoshuman 所有,欢迎分享本文,转载请保留出处!
免责声明:本文为原创文章,版权归 zhaoshuman 所有,欢迎分享本文,转载请保留出处!
发表评论