python开启http/https代理源码(带 IP 白名单 + 域名白名单,兼容 Python 3.5.2)

原创 zhaoshuman  2026-03-13 09:26:37  阅读 140 次 评论 0 条
摘要:

使用第三方代理软件存在一定安全风险,使用python源码让风险可控,且可通过ip白敏名单和域名白名单控制访问权限,进一步控制风险,此代理的典型应用场景:A机器可以访问github.com,B机器所在网络能访问A机器但是不能直接访问github.com,则可以在A机器上运行此脚本,再B机器上配置浏览器的代理指向A机器8080(脚本默认端口,若修改后请使用新端口,如何配置浏览器代理请自行百度)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多线程版 HTTP/HTTPS 代理(带 IP 白名单 + 域名白名单,兼容 Python 3.5.2)
监听 0.0.0.0:10225,仅允许白名单内 IP 客户端连接,仅允许访问白名单内域名
"""
import socket
import select
import signal
import sys
import time
import threading
import ipaddress
import re
# ==================== IP 白名单配置(可自定义修改) ====================
# 支持两种格式:
# 1. 单个 IP 地址(如 "127.0.0.1"、"192.168.1.100")
# 2. C 段/网段(如 "192.168.1.0/24"、"10.0.0.0/8")
IP_WHITELIST = [
    "127.0.0.1",          # 本机回环地址(必选,用于本机测试)
    #"0.0.0.0/0",          # 允许所有 IP(生产环境请修改为具体网段)
    "20.20.20.0/24",     # 内网 C 段(可根据你的内网网段修改)
    "10.10.0.0/16"
    # "119.6.57.173",      # 可添加指定的外部公网 IP
    # "220.181.0.0/16",    # 可添加指定的公网网段
]
# ==================================================================
# ==================== 域名白名单配置(新增) ====================
# 支持两种格式:
# 1. 精确域名匹配(如 "www.baidu.com"、"api.github.com")
# 2. 通配符匹配(如 "*.baidu.com" 匹配所有 baidu.com 子域名)
#    - 注意:通配符只匹配一级子域名,如 "*.example.com" 匹配 "www.example.com" 但不匹配 "sub.www.example.com"
#    - 如需匹配多级子域名,请使用多个规则或使用 "*." 开头的规则组合
DOMAIN_WHITELIST = [
    # 百度系
    "*",
    "*.baidu.com",        # 匹配所有 baidu.com 子域名    
    # GitHub
    "github.com",
    "*.github.com",
        # GitHub
    "api.loadteam.com",
    "*.loadteam.com"
    
    # 示例:允许所有域名(测试用,生产环境请删除)
    #"*",                 # 通配符 * 表示允许所有域名(慎用)
]
# ==================================================================
# 代理基础配置项
LISTEN_IP, LISTEN_PORT = '0.0.0.0', 8080  #指定代理端口
BUFFER_SIZE = 16384
TIMEOUT_SECONDS = 60
MAX_HEADER_SIZE = 65536
MAX_THREADS = 100  # 最大线程数限制
THREAD_POOL = []  # 线程池
# 全局变量 + 线程锁
ACTIVE_CONNECTIONS = 0
CONN_LOCK = threading.Lock()
THREAD_LOCK = threading.Lock()
def is_allowed_domain(target_host):
    """
    校验目标域名是否在白名单内(新增函数)
    :param target_host: 目标域名(字符串格式,如 "www.baidu.com")
    :return: True(允许访问)/ False(拒绝访问)
    """
    if not target_host:
        return False
    
    # 转换为小写进行不区分大小写的匹配
    host = target_host.lower().strip()
    
    for pattern in DOMAIN_WHITELIST:
        pattern = pattern.lower().strip()
        
        # 完全通配符:允许所有域名
        if pattern == "*":
            return True
            
        # 通配符匹配:*.example.com
        if pattern.startswith("*."):
            # 去掉通配符后的域名后缀
            suffix = pattern[1:]  # 如 ".baidu.com"
            # 检查是否以该后缀结尾,或者完全等于去掉通配符后的部分(如 "baidu.com" 匹配 "*.baidu.com")
            if host.endswith(suffix) or host == suffix[1:]:
                return True
                
        # 精确匹配
        if host == pattern:
            return True
            
    return False
def is_allowed_ip(client_ip):
    """
    校验客户端 IP 是否在白名单内(兼容 Python 3.5.2)
    :param client_ip: 客户端 IP 地址(字符串格式)
    :return: True(允许连接)/ False(拒绝连接)
    """
    if not client_ip:
        return False
    try:
        # 将客户端 IP 转为 ip_address 对象
        client_ip_obj = ipaddress.ip_address(client_ip)
        # 遍历白名单,逐一校验
        for allowed_item in IP_WHITELIST:
            try:
                # Python 3.5.2 ip_network 不支持 strict=False,直接尝试网段匹配
                allowed_network = ipaddress.ip_network(allowed_item)
                if client_ip_obj in allowed_network:
                    return True
            except ValueError:
                # 网段匹配失败,按单个 IP 精确匹配
                try:
                    allowed_ip_obj = ipaddress.ip_address(allowed_item)
                    if client_ip_obj == allowed_ip_obj:
                        return True
                except:
                    # 非法 IP 格式,跳过该白名单项
                    continue
        # 未匹配到白名单
        return False
    except Exception as e:
        log("IP 校验异常 | 客户端 IP: {} | 错误: {}".format(client_ip, e))
        return False
def log(msg):
    """带时间戳、连接数的线程安全日志(使用 format 兼容 Python 3.5)"""
    global ACTIVE_CONNECTIONS
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    with CONN_LOCK:
        print("[{}] [CONNS:{}] {}".format(timestamp, ACTIVE_CONNECTIONS, msg), flush=True)
def set_socket_opt(sock):
    """优化套接字参数"""
    try:
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        if hasattr(socket, 'TCP_KEEPIDLE'):
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 5)
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFFER_SIZE * 4)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, BUFFER_SIZE * 4)
        sock.settimeout(TIMEOUT_SECONDS)
    except Exception as e:
        log("套接字参数优化失败: {}".format(e))
def relay_bidirectional(client_sock, remote_sock, desc=""):
    """鲁棒的双向数据转发(兼容 Python 3.5.2)"""
    global ACTIVE_CONNECTIONS
    bytes_sent = 0
    bytes_recv = 0
    try:
        set_socket_opt(client_sock)
        set_socket_opt(remote_sock)
        read_list = [client_sock, remote_sock]
        log("{}: 开始双向转发 (缓冲区:{} 超时:{}s)".format(desc, BUFFER_SIZE, TIMEOUT_SECONDS))
        while read_list:
            rlist, _, xlist = select.select(read_list, [], read_list, TIMEOUT_SECONDS)
            if xlist:
                log("{}: 套接字异常 {}".format(desc, xlist))
                break
            if not rlist:
                if bytes_sent + bytes_recv > 0:
                    log("{}: 超时但已转发数据,继续等待 (已发:{} 已收:{})".format(desc, bytes_sent, bytes_recv))
                    continue
                log("{}: 无数据转发超时退出".format(desc))
                break
            for sock in rlist:
                try:
                    data = sock.recv(BUFFER_SIZE)
                    if not data:
                        try:
                            test_data = sock.recv(1, socket.MSG_PEEK)
                            if not test_data:
                                log("{}: 确认对方关闭连接 (已转发: {} 字节)".format(desc, bytes_sent+bytes_recv))
                                return
                        except:
                            pass
                        continue
                    if sock is client_sock:
                        target = remote_sock
                        bytes_sent += len(data)
                        log("{}: 客户端→目标 [{} 字节] (累计:{})".format(desc, len(data), bytes_sent))
                    else:
                        target = client_sock
                        bytes_recv += len(data)
                        log("{}: 目标→客户端 [{} 字节] (累计:{})".format(desc, len(data), bytes_recv))
                    target.sendall(data)
                except BlockingIOError:
                    continue
                except socket.timeout:
                    log("{}: 单方向数据读取超时,继续等待".format(desc))
                    continue
                except Exception as e:
                    log("{}: 转发错误 {}: {}".format(desc, type(e).__name__, e))
                    return
    except Exception as e:
        log("{}: 转发异常 {}: {}".format(desc, type(e).__name__, e))
    finally:
        with CONN_LOCK:
            ACTIVE_CONNECTIONS -= 1
        # 优雅关闭套接字
        for s in [client_sock, remote_sock]:
            try:
                s.shutdown(socket.SHUT_RDWR)
            except:
                pass
            try:
                s.close()
            except:
                pass
        log("{}: 转发结束 (总计转发: {} 字节)".format(desc, bytes_sent+bytes_recv))
def parse_target(request_line):
    """解析目标地址(兼容 Python 3.5.2)"""
    try:
        if request_line.startswith('CONNECT'):
            target = request_line.split()[1]
            if ':' in target:
                host, port = target.split(':', 1)
                port = int(port)
            else:
                host = target
                port = 443
            return host, port
        else:
            from urllib.parse import urlparse
            url = request_line.split()[1]
            parsed = urlparse(url)
            host = parsed.hostname
            port = parsed.port or (80 if parsed.scheme == 'http' else 443)
            return host, port
    except Exception as e:
        log("解析目标失败: {} | 请求行: {}".format(e, request_line[:100]))
        return None, None
def handle_connect(client_sock, request_line, client_addr):
    """处理 HTTPS CONNECT 隧道(增加域名白名单校验)"""
    host, port = parse_target(request_line)
    if not host or not port:
        client_sock.sendall(b'HTTP/1.1 400 Bad Request\r\n\r\nInvalid target')
        return
    # ==================== 域名白名单校验(新增) ====================
    if not is_allowed_domain(host):
        log("❌ 拒绝访问非白名单域名 | 客户端: {} | 目标: {}:{}".format(client_addr, host, port))
        error_response = (
            b"HTTP/1.1 403 Forbidden\r\n"
            b"Content-Type: text/plain; charset=utf-8\r\n"
            b"Connection: close\r\n"
            b"\r\n"
            b"403 Forbidden: Domain not in whitelist\r\n"
        )
        try:
            client_sock.sendall(error_response)
        except:
            pass
        return
    # ==================================================================
    try:
        remote_sock = None
        for retry in range(2):
            try:
                remote_sock = socket.create_connection((host, port), timeout=10)
                break
            except:
                time.sleep(0.1)
        if not remote_sock:
            raise Exception("连接目标失败(2次重试)")
        response = (
            b"HTTP/1.1 200 Connection Established\r\n"
            b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S GMT").encode() + b"\r\n"
            b"Server: SimpleProxy/1.0\r\n"
            b"Connection: keep-alive\r\n"
            b"Proxy-Connection: keep-alive\r\n"
            b"\r\n"
        )
        client_sock.sendall(response)
        relay_bidirectional(
            client_sock, remote_sock,
            "{} <-> {}:{} (HTTPS)".format(client_addr, host, port)
        )
    except Exception as e:
        error_msg = "HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain\r\n\r\n{}".format(str(e))
        try:
            client_sock.sendall(error_msg.encode('utf-8', errors='replace'))
        except:
            pass
        log("{}: CONNECT 失败 {}:{} | {}".format(client_addr, host, port, e))
def handle_http(client_sock, request_data, client_addr):
    """处理 HTTP 请求(增加域名白名单校验)"""
    try:
        header_end = request_data.find(b'\r\n\r\n')
        if header_end == -1:
            client_sock.sendall(b'HTTP/1.1 400 Bad Request\r\n\r\nMissing header end')
            return
        request_line = request_data[:header_end].split(b'\r\n')[0].decode('utf-8', errors='ignore')
        host, port = parse_target(request_line)
        if not host or not port:
            client_sock.sendall(b'HTTP/1.1 400 Bad Request\r\n\r\nInvalid URL')
            return
        # ==================== 域名白名单校验(新增) ====================
        if not is_allowed_domain(host):
            log("❌ 拒绝访问非白名单域名 | 客户端: {} | 目标: {}:{}".format(client_addr, host, port))
            error_response = (
                b"HTTP/1.1 403 Forbidden\r\n"
                b"Content-Type: text/plain; charset=utf-8\r\n"
                b"Connection: close\r\n"
                b"\r\n"
                b"403 Forbidden: Domain not in whitelist\r\n"
            )
            try:
                client_sock.sendall(error_response)
            except:
                pass
            return
        # ==================================================================
        header_lines = request_data[:header_end].split(b'\r\n')
        new_headers = []
        has_host = False
        for line in header_lines:
            lower_line = line.lower()
            if any(p in lower_line for p in [b'proxy-', b'connection: keep-alive']):
                continue
            if lower_line.startswith(b'host:'):
                has_host = True
            new_headers.append(line)
        if not has_host:
            new_headers.insert(1, "Host: {}".format(host).encode())
        new_headers.append(b"Connection: keep-alive")
        new_headers.append(b"Accept-Encoding: gzip, deflate")
        new_header = b'\r\n'.join(new_headers)
        body = request_data[header_end+4:]
        forward_data = new_header + b'\r\n\r\n' + body
        remote_sock = socket.create_connection((host, port), timeout=10)
        set_socket_opt(remote_sock)
        remote_sock.sendall(forward_data)
        relay_bidirectional(
            client_sock, remote_sock,
            "{} <-> {}:{} (HTTP)".format(client_addr, host, port)
        )
    except Exception as e:
        error_msg = "HTTP/1.1 500 Internal Server Error\r\n\r\n{}".format(str(e))
        try:
            client_sock.sendall(error_msg.encode())
        except:
            pass
        log("{}: HTTP 处理失败 | {}".format(client_addr, e))
def handle_client(client_sock, client_addr):
    """处理单个客户端连接(工作线程执行函数,兼容 Python 3.5.2)"""
    global ACTIVE_CONNECTIONS
    with CONN_LOCK:
        ACTIVE_CONNECTIONS += 1
    try:
        client_sock.settimeout(TIMEOUT_SECONDS)
        set_socket_opt(client_sock)
        request_data = b''
        while len(request_data) < MAX_HEADER_SIZE:
            chunk = client_sock.recv(BUFFER_SIZE)
            if not chunk:
                return
            request_data += chunk
            if b'\r\n\r\n' in request_data:
                break
        if not request_data:
            log("{}: 空请求,关闭连接".format(client_addr))
            return
        first_line = request_data.split(b'\r\n')[0].decode('utf-8', errors='ignore').strip()
        log("{}: 收到请求 | {}".format(client_addr, first_line[:100]))
        if first_line.upper().startswith('CONNECT'):
            handle_connect(client_sock, first_line, client_addr)
        else:
            handle_http(client_sock, request_data, client_addr)
    except socket.timeout:
        log("{}: 客户端超时({}s)".format(client_addr, TIMEOUT_SECONDS))
    except Exception as e:
        log("{}: 处理异常 {}: {}".format(client_addr, type(e).__name__, e))
    finally:
        try:
            client_sock.close()
        except:
            pass
        # 清理线程池中的无效线程(兼容 Python 3.5 列表操作)
        with THREAD_LOCK:
            # 先创建新列表,再赋值给 THREAD_POOL,避免迭代时修改列表
            valid_threads = []
            for thread in THREAD_POOL:
                if thread.is_alive():
                    valid_threads.append(thread)
            THREAD_POOL[:] = valid_threads
def main():
    """主服务入口(IP 过滤 + 线程分配,兼容 Python 3.5.2)"""
    def signal_handler(_sig, _frame):
        log("收到退出信号,正在关闭所有线程...")
        with THREAD_LOCK:
            for thread in THREAD_POOL:
                thread.join(timeout=2)
        log("所有线程已关闭,代理退出")
        sys.exit(0)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    try:
        server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        server_sock.bind((LISTEN_IP, LISTEN_PORT))
        server_sock.listen(512)
        server_sock.settimeout(1.0)
        log("多线程代理服务器启动成功 ✅ | {}:{}".format(LISTEN_IP, LISTEN_PORT))
        log("IP 白名单: {}".format(IP_WHITELIST))
        log("域名白名单: {}".format(DOMAIN_WHITELIST))  # 新增日志输出
        log("最大并发线程数: {}".format(MAX_THREADS))
        log("测试命令: curl -x http://127.0.0.1:{} https://www.baidu.com".format(LISTEN_PORT))
        while True:
            try:
                # 接受新客户端连接
                client_sock, client_addr = server_sock.accept()
                client_ip = client_addr[0]  # 提取客户端 IP 地址(如 192.168.1.100)
                # ==================== IP 白名单过滤(核心拦截点) ====================
                if not is_allowed_ip(client_ip):
                    # 非白名单 IP,直接关闭连接并记录日志
                    log("❌ 拒绝非白名单 IP 连接 | 客户端 IP: {} | 地址: {}".format(client_ip, client_addr))
                    client_sock.close()
                    continue
                # ==================================================================
                # 控制线程数,避免资源耗尽
                with THREAD_LOCK:
                    # 清理已结束的线程
                    valid_threads = []
                    for thread in THREAD_POOL:
                        if thread.is_alive():
                            valid_threads.append(thread)
                    THREAD_POOL[:] = valid_threads
                    # 若线程数达到上限,等待一段时间
                    if len(THREAD_POOL) >= MAX_THREADS:
                        log("线程数达到上限 {},等待空闲线程...".format(MAX_THREADS))
                        time.sleep(1)
                        continue
                # 创建新线程处理客户端连接
                client_thread = threading.Thread(
                    target=handle_client,
                    args=(client_sock, client_addr),
                    daemon=True  # 守护线程,主程序退出时自动销毁
                )
                # 将线程加入线程池
                with THREAD_LOCK:
                    THREAD_POOL.append(client_thread)
                # 启动线程
                client_thread.start()
                log("✅ {}: 已分配工作线程(当前线程数: {})".format(client_addr, len(THREAD_POOL)))
            except socket.timeout:
                continue
            except Exception as e:
                log("主循环异常: {}".format(e))
    except PermissionError:
        log("权限不足 ❌ | 无法绑定 {}(需 sudo 或端口>1024)".format(LISTEN_PORT))
        sys.exit(1)
    except Exception as e:
        log("启动失败 ❌ | {}".format(e))
        sys.exit(1)
    finally:
        try:
            server_sock.close()
        except:
            pass
if __name__ == '__main__':
    main()


本文地址:https://zhaoshuman.cn/%E6%8A%80%E6%9C%AF%E5%88%86%E4%BA%AB/10.html
免责声明:本文为原创文章,版权归 zhaoshuman 所有,欢迎分享本文,转载请保留出处!

发表评论


表情

还没有留言,还不快点抢沙发?