ssh方式实现文件增量同步备份到本地python源码(从远程机器备份文件到本地,本地支持windows和linux)

原创 zhaoshuman  2026-03-06 09:55:55  阅读 158 次 评论 0 条
摘要:

脚本功能:ssh方式实现自动备份,需要在sync_config.ini配置文件中按实际需求配置参数 

运行说明:脚本只为单次运行,运行结束会显示本次运行总时长,如果第一次需要同步的文件较多会耗费时间较长

主要功能如下:

  • 从配置文件读取远程服务器连接信息、同步路径、扫描规则

  • 通过 SSH 获取远程服务器指定目录下符合时间规则的文件信息

  • 对比本地文件和远程文件的修改时间,仅同步新增 / 更新的文件(增量同步)

  • 使用 SQLite 数据库记录文件信息,避免重复扫描和同步,会在同步的本地目录生成file_info.db数据库文件,保存文件信息

  • 提供完善的日志记录、错误处理和同步结果验证


#coding=utf-8
#################################################
#脚本功能:ssh方式实现自动备份,需要在sync_config.ini配置文件中配置参数
#[Paths]
# local_dir = d:/BaiduSyncdisk/data   #本地备份目录,注意路径格式只能使用/
# remote_dir = /data/data             #远程目录
#
# [Connection]
# remote_host = 127.0.0.1               #远程连接IP
# remote_user =data
# ssh_port = 33322
# ssh_key = ~/.ssh/id_ed25519
#
# [Scan]
# days = 30                             #文件扫描天数,只扫描近30有修改过的文件,避免文件数量过多之后出现太长耗时,第一次运行手动修改此时间
# skip_today = True                     #是否跳过当天的文件,默认跳过,可以避免出现文件还在拷贝或者写入过程就执行文件拷贝了
#
#
#脚本编写人与日期: zsm 20250630
##################################################
import os
import subprocess
import configparser
import logging
from datetime import datetime, timedelta
import sqlite3
from contextlib import contextmanager
import time
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='sync.log'
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
# 检查日志文件大小,超过1GB则清空
LOG_MAX_SIZE = 1 * 1024 * 1024 * 1024  # 1GB
if os.path.exists('sync.log'):
    log_size = os.path.getsize('sync.log')
    if log_size > LOG_MAX_SIZE:
        logging.info(f"日志文件大小超过1GB,正在清空...")
        with open('sync.log', 'w') as f:
            f.write("日志文件因大小超过1GB被清空\n")
class RemoteFileSync:
    def __init__(self):
        self.start_time = time.time()  # 记录开始时间
        self.config = self._load_config()
        self.local_dir = self.config.get('Paths', 'local_dir')
        self.remote_dir = self.config.get('Paths', 'remote_dir')
        self.remote_host = self.config.get('Connection', 'remote_host')
        self.remote_user = self.config.get('Connection', 'remote_user')
        self.ssh_port = self.config.getint('Connection', 'ssh_port', fallback=22)
        self.ssh_key = self.config.get('Connection', 'ssh_key', fallback=None)
        self.scan_days = self.config.getint('Scan', 'days', fallback=30)
        self.skip_today = self.config.getboolean('Scan', 'skip_today', fallback=True)  # 新增配置项
        # 数据库路径
        self.db_path = os.path.join(self.local_dir, 'file_info.db')
        # 确保本地目录存在
        os.makedirs(self.local_dir, exist_ok=True)
        # 初始化数据库
        self._init_db()
    def _load_config(self):
        """加载配置文件"""
        config = configparser.ConfigParser()
        config_path = 'sync_config.ini'
        if not os.path.exists(config_path):
            # 创建默认配置文件
            config['Paths'] = {
                'local_dir': 'd:/BaiduSyncdisk/data',
                'remote_dir': '/data/data'
            }
            config['Connection'] = {
                'remote_host': 'your_server_ip',
                'remote_user': 'your_username',
                'ssh_port': 22,
                'ssh_key': '/path/to/your/private_key'
            }
            config['Scan'] = {
                'days': 30,  # 默认扫描30天内的文件
                'skip_today': 'True'  # 新增:是否跳过当日修改的文件
            }
            with open(config_path, 'w') as f:
                config.write(f)
            logging.info(f"已创建配置文件: {config_path},请编辑配置")
            exit(1)
        config.read(config_path)
        return config
    def _init_db(self):
        """初始化SQLite数据库"""
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            # 创建文件信息表(确保表名和字段名不包含特殊字符)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS file_info (
                    path TEXT PRIMARY KEY,
                    mtime INTEGER NOT NULL,
                    last_checked INTEGER NOT NULL,
                    is_today INTEGER DEFAULT 0
                )
            ''')
            # 创建索引以加速查询(确保索引名不包含特殊字符)
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_mtime ON file_info (mtime)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_last_checked ON file_info (last_checked)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_is_today ON file_info (is_today)')
            conn.commit()
    @contextmanager
    def _get_db_connection(self):
        """获取数据库连接,使用上下文管理器确保资源正确关闭"""
        conn = sqlite3.connect(self.db_path)
        # 启用外键约束(如果需要)
        conn.execute("PRAGMA foreign_keys = ON")
        # 设置文本编码为UTF-8
        conn.text_factory = str
        try:
            yield conn
        finally:
            conn.close()
    def _run_ssh_command(self, command):
        """执行SSH命令"""
        base_cmd = ['ssh']
        # 添加SSH端口
        base_cmd.extend(['-p', str(self.ssh_port)])
        # 添加SSH密钥(如果有)
        if self.ssh_key:
            base_cmd.extend(['-i', self.ssh_key])
        # 添加远程用户和主机
        base_cmd.append(f"{self.remote_user}@{self.remote_host}")
        # 添加要执行的命令
        base_cmd.append(command)
        try:
            # 执行命令并捕获二进制输出
            result = subprocess.run(
                base_cmd,
                capture_output=True,
                check=True
            )
            # 手动使用UTF-8解码输出
            stdout = result.stdout.decode('utf-8', errors='replace')
            return stdout.strip()
        except subprocess.CalledProcessError as e:
            # 手动解码错误输出
            stderr = e.stderr.decode('utf-8', errors='replace')
            logging.error(f"SSH命令执行失败: {stderr}")
            raise
        except UnicodeDecodeError as e:
            logging.error(f"解码SSH输出失败: {e}")
            return ""
    def _run_scp_command(self, source, destination):
        """执行SCP命令"""
        base_cmd = ['scp']
        # 添加SSH端口
        base_cmd.extend(['-P', str(self.ssh_port)])
        # 添加SSH密钥(如果有)
        if self.ssh_key:
            base_cmd.extend(['-i', self.ssh_key])
        # 添加源和目标
        base_cmd.extend([source.replace("\"", "").replace("\'", ""), destination])
        try:
            # 执行命令并捕获二进制输出
            result = subprocess.run(
                base_cmd,
                capture_output=True,
                check=True
            )
            # 手动使用UTF-8解码输出
            stdout = result.stdout.decode('utf-8', errors='replace')
            logging.info(f"SCP命令输出: {stdout}")
            logging.info(f"文件传输成功: {source} -> {destination}")
            return True
        except subprocess.CalledProcessError as e:
            # 手动解码错误输出
            stderr = e.stderr.decode('utf-8', errors='replace')
            logging.error(f"SCP命令执行失败: {stderr}")
            return False
    def get_remote_file_info(self):
        """获取远程服务器文件信息并保存到数据库"""
        logging.info(f"正在获取远程服务器文件信息...")
        # 计算时间范围
        today_start = int(datetime.combine(datetime.today(), datetime.min.time()).timestamp())
        today_end = int(datetime.combine(datetime.today(), datetime.max.time()).timestamp())
        cutoff_time = int((datetime.now() - timedelta(days=self.scan_days)).timestamp())
        # 构建获取文件信息的命令
        find_filters = []
        # 添加时间范围过滤
        find_filters.append(f'-newermt "$(date -d @{cutoff_time} +\'%Y-%m-%d %H:%M:%S\')"')
        # 添加跳过当日文件的过滤
        if self.skip_today:
            find_filters.append(f'-not -newermt "$(date -d @{today_start} +\'%Y-%m-%d %H:%M:%S\')"')
            find_filters.append(f'-not -newerct "$(date -d @{today_end} +\'%Y-%m-%d %H:%M:%S\')"')
        find_filters_str = ' '.join(find_filters)
        find_cmd = (f'find "{self.remote_dir}" -type f {find_filters_str} '
                    f'-exec sh -c \'for file; do printf "%s #|# %s\n" "$file" "$(stat -c %Y "$file")"; done\' sh {{}} +')
        try:
            # 执行命令获取文件信息
            file_info_text = self._run_ssh_command(find_cmd)
            # 解析文件信息并更新数据库
            self._update_db_with_file_info(file_info_text, today_start, today_end)
            # 从数据库获取所有文件信息,根据skip_today决定是否包含当日文件
            with self._get_db_connection() as conn:
                cursor = conn.cursor()
                if self.skip_today:
                    cursor.execute("SELECT path, mtime FROM file_info WHERE is_today = 0")
                else:
                    cursor.execute("SELECT path, mtime FROM file_info")
                return {row[0]: row[1] for row in cursor.fetchall()}
        except Exception as e:
            logging.error(f"获取远程文件信息失败: {e}")
            return {}
    def _update_db_with_file_info(self, file_info_text, today_start, today_end):
        """将文件信息更新到数据库"""
        current_time = int(datetime.now().timestamp())
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            # 开始事务以提高性能
            conn.execute("BEGIN TRANSACTION")
            for line in file_info_text.strip().split('\n'):
                if not line:
                    continue
                try:
                    parts = line.rsplit(' #|# ', 1)
                    if len(parts) != 2:
                        continue
                    file_path, mtime = parts
                    mtime = int(mtime)
                    # 转义文件路径中的特殊字符,特别是#号
                    file_path = file_path.replace('#', '\\#')
                    # 判断是否为当日修改的文件
                    is_today = 1 if today_start <= mtime <= today_end else 0
                    # 插入或更新文件信息(使用参数化查询避免SQL注入)
                    cursor.execute(
                        """
                        INSERT OR REPLACE INTO file_info (path, mtime, last_checked, is_today)
                        VALUES (?, ?, ?, ?)
                        """,
                        (file_path, mtime, current_time, is_today)
                    )
                except Exception as e:
                    logging.warning(f"处理文件信息失败: {line}, 错误: {e}")
            # 提交事务
            conn.commit()
            # 清理超过扫描期限且未更新的文件(使用参数化查询)
            cutoff_time = int((datetime.now() - timedelta(days=self.scan_days)).timestamp())
            cursor.execute(
                """
                DELETE FROM file_info 
                WHERE mtime < ? AND last_checked < ?
                """,
                (cutoff_time, current_time)
            )
            logging.info(f"已更新数据库中的文件信息,跳过当日修改文件: {self.skip_today}")
    def get_local_file_info(self):
        """获取本地文件信息"""
        logging.info("正在获取本地文件信息...")
        file_info = {}
        for root, _, files in os.walk(self.local_dir):
            for file in files:
                # 跳过数据库文件本身
                if file == os.path.basename(self.db_path):
                    continue
                file_path = os.path.join(root, file)
                # 计算相对路径
                rel_path = os.path.relpath(file_path, self.local_dir)
                # 构建对应的远程路径(转义#号)
                remote_file_path = os.path.join(
                    self.remote_dir,
                    rel_path.replace('\\', '/')  # 确保远程路径使用正斜杠
                ).replace('\\', '/').replace('#', '\\#')
                # 获取文件修改时间
                try:
                    mtime = int(os.path.getmtime(file_path))
                    file_info[remote_file_path] = mtime
                except Exception as e:
                    logging.warning(f"获取本地文件信息失败: {file_path}, 错误: {e}")
        return file_info
    def sync_files(self):
        """同步文件(仅从远程到本地)"""
        logging.info("开始同步文件...")
        # 获取远程和本地文件信息
        remote_files = self.get_remote_file_info()
        local_files = self.get_local_file_info()
        # 用于跟踪需要更新的文件信息
        updated_files = {}
        # 同步远程有但本地没有的文件
        for remote_path in list(remote_files.keys()):
            if remote_path not in local_files:
                logging.info(f"检测到新文件: {remote_path}")
                success = self._sync_from_remote(remote_path)
                if success:
                    # 更新本地文件信息
                    local_mtime = int(os.path.getmtime(
                        os.path.join(self.local_dir, os.path.relpath(remote_path, self.remote_dir))
                    ))
                    local_files[remote_path] = local_mtime
                    updated_files[remote_path] = local_mtime
                else:
                    remote_files.pop(remote_path, None)
        # 比较修改时间,同步更新的文件
        for remote_path in set(remote_files.keys()) & set(local_files.keys()):
            remote_mtime = remote_files[remote_path]
            local_mtime = local_files[remote_path]
            # 增加调试日志
            logging.debug(f"比较文件: {remote_path}")
            logging.debug(f"  远程时间戳: {remote_mtime} ({datetime.fromtimestamp(remote_mtime)})")
            logging.debug(f"  本地时间戳: {local_mtime} ({datetime.fromtimestamp(local_mtime)})")
            # 使用容差值比较时间戳,解决精度差异问题
            time_diff = remote_mtime - local_mtime
            TIME_TOLERANCE = 2  # 允许2秒的误差,可根据需要调整
            if time_diff > TIME_TOLERANCE:
                logging.info(f"远程文件更新: {remote_path} (差异: {time_diff}秒)")
                success = self._sync_from_remote(remote_path)
                if success:
                    # 更新本地文件信息
                    local_mtime = int(os.path.getmtime(
                        os.path.join(self.local_dir, os.path.relpath(remote_path, self.remote_dir))
                    ))
                    local_files[remote_path] = local_mtime
                    updated_files[remote_path] = local_mtime
                    logging.info(f"更新本地时间戳: {local_mtime}")
                else:
                    remote_files.pop(remote_path, None)
            else:
                logging.debug(f"文件时间戳一致: {remote_path} (差异: {time_diff}秒)")
        # 验证同步结果
        self._verify_sync_results(local_files, remote_files)
        # 计算并记录运行时长
        run_time = time.time() - self.start_time
        hours, remainder = divmod(run_time, 3600)
        minutes, seconds = divmod(remainder, 60)
        logging.info(f"脚本运行完成,总耗时: {int(hours)}小时{int(minutes)}分钟{seconds:.2f}秒")
    def _sync_from_remote(self, remote_path):
        """从远程同步文件到本地"""
        # 构建本地路径(反转义#号)
        local_remote_path = remote_path.replace('\\#', '#')
        rel_path = os.path.relpath(local_remote_path, self.remote_dir)
        local_path = os.path.join(self.local_dir, rel_path)
        # 确保本地目录存在
        local_dir = os.path.dirname(local_path)
        os.makedirs(local_dir, exist_ok=True)
        logging.info(f"从远程同步: {local_remote_path} -> {local_path}")
        # 使用更健壮的路径转义方法
        escaped_remote_path = self._escape_path(local_remote_path)
        # 使用双引号包裹整个路径,内部使用单引号保护
        source = f"{self.remote_user}@{self.remote_host}:\"'{escaped_remote_path}'\""
        destination = local_path
        logging.info(f"执行SCP命令: {' '.join(['scp', '-P', str(self.ssh_port), source, destination])}")
        # 执行SCP命令并获取结果
        success = self._run_scp_command(source, destination)
        # 如果失败,记录详细的调试信息
        if not success:
            logging.error(f"同步失败: {local_remote_path}")
            logging.error(f"转义后的远程路径: {escaped_remote_path}")
            logging.error(f"SCP源路径: {source}")
            return False
        # 验证下载后的文件时间戳
        try:
            downloaded_mtime = int(os.path.getmtime(local_path))
            logging.info(f"下载后文件时间戳: {downloaded_mtime} ({datetime.fromtimestamp(downloaded_mtime)})")
        except Exception as e:
            logging.error(f"获取下载后文件时间戳失败: {e}")
            return False
        return True
    def _escape_path(self, path):
        """更健壮的路径转义方法,处理所有特殊字符"""
        # 转义单引号 - 这是最关键的特殊字符
        escaped = path.replace("'", "'\\''")
        # 转义#号
        escaped = escaped.replace('#', '\\#')
        # 对于包含空格的路径,额外添加保护
        if ' ' in escaped:
            # 在路径两边添加单引号,并用双引号包裹整个路径
            escaped = f"'{escaped}'"
        return escaped
    def _verify_sync_results(self, local_files, remote_files):
        """验证同步结果"""
        logging.info("验证同步结果...")
        with self._get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT path, mtime, is_today FROM file_info")
            db_files = {row[0]: (row[1], row[2]) for row in cursor.fetchall()}
        for remote_path in set(local_files.keys()) | set(remote_files.keys()):
            local_time = local_files.get(remote_path, None)
            remote_time = remote_files.get(remote_path, None)
            if remote_path in db_files:
                db_time, is_today = db_files[remote_path]
            else:
                db_time, is_today = None, None
            # 反转义路径以便显示
            display_path = remote_path.replace('\\#', '#')
            if local_time is not None and remote_time is not None:
                time_diff = local_time - remote_time
                if abs(time_diff) > 100:  # 允许2秒的误差
                    logging.warning(f"时间戳冲突,本地文件日期更新: {display_path}")
                    logging.warning(f"  本地: {local_time} ({datetime.fromtimestamp(local_time)})")
                    logging.warning(f"  远程: {remote_time} ({datetime.fromtimestamp(remote_time)})")
                    logging.warning(f"  差异: {time_diff}秒")
            elif local_time is None and remote_time is not None:
                if is_today == 1:
                    if self.skip_today:
                        logging.info(f"跳过当日修改文件: {display_path}")
                    else:
                        logging.warning(f"本地缺少当日修改文件: {display_path}")
                else:
                    logging.warning(f"本地缺少文件: {display_path}")
            elif local_time is not None and remote_time is None:
                if db_time is not None:
                    if is_today == 1:
                        if self.skip_today:
                            logging.info(f"远程文件为当日修改,未扫描: {display_path}")
                        else:
                            logging.warning(f"远程当日修改文件未同步: {display_path}")
                    else:
                        logging.info(f"文件已超过扫描期限: {display_path}")
if __name__ == "__main__":
    start_time = time.time()
    try:
        sync_tool = RemoteFileSync()
        sync_tool.sync_files()
    except Exception as e:
        logging.error(f"同步过程中发生错误: {e}")
        import traceback
        logging.error(traceback.format_exc())
    finally:
        # 无论是否出错,都记录总运行时间
        total_run_time = time.time() - start_time
        hours, remainder = divmod(total_run_time, 3600)
        minutes, seconds = divmod(remainder, 60)
        logging.info(f"脚本总运行时间: {int(hours)}小时{int(minutes)}分钟{seconds:.2f}秒")


本文地址:https://zhaoshuman.cn/%E6%8A%80%E6%9C%AF%E5%88%86%E4%BA%AB/8.html
免责声明:本文为原创文章,版权归 zhaoshuman 所有,欢迎分享本文,转载请保留出处!

发表评论


表情

还没有留言,还不快点抢沙发?