#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多线程版 HTTP/HTTPS 代理(带 IP 白名单 + 域名白名单,兼容 Python 3.5.2)
监听 0.0.0.0:10225,仅允许白名单内 IP 客户端连接,仅允许访问白名单内域名
"""
import socket
import select
import signal
import sys
import time
import threading
import ipaddress
import re
# ==================== IP 白名单配置(可自定义修改) ====================
# 支持两种格式:
# 1. 单个 IP 地址(如 "127.0.0.1"、"192.168.1.100")
# 2. C 段/网段(如 "192.168.1.0/24"、"10.0.0.0/8")
IP_WHITELIST = [
"127.0.0.1", # 本机回环地址(必选,用于本机测试)
#"0.0.0.0/0", # 允许所有 IP(生产环境请修改为具体网段)
"20.20.20.0/24", # 内网 C 段(可根据你的内网网段修改)
"10.10.0.0/16"
# "119.6.57.173", # 可添加指定的外部公网 IP
# "220.181.0.0/16", # 可添加指定的公网网段
]
# ==================================================================
# ==================== 域名白名单配置(新增) ====================
# 支持两种格式:
# 1. 精确域名匹配(如 "www.baidu.com"、"api.github.com")
# 2. 通配符匹配(如 "*.baidu.com" 匹配所有 baidu.com 子域名)
# - 注意:通配符只匹配一级子域名,如 "*.example.com" 匹配 "www.example.com" 但不匹配 "sub.www.example.com"
# - 如需匹配多级子域名,请使用多个规则或使用 "*." 开头的规则组合
DOMAIN_WHITELIST = [
# 百度系
"*",
"*.baidu.com", # 匹配所有 baidu.com 子域名
# GitHub
"github.com",
"*.github.com",
# GitHub
"api.loadteam.com",
"*.loadteam.com"
# 示例:允许所有域名(测试用,生产环境请删除)
#"*", # 通配符 * 表示允许所有域名(慎用)
]
# ==================================================================
# 代理基础配置项
LISTEN_IP, LISTEN_PORT = '0.0.0.0', 8080 #指定代理端口
BUFFER_SIZE = 16384
TIMEOUT_SECONDS = 60
MAX_HEADER_SIZE = 65536
MAX_THREADS = 100 # 最大线程数限制
THREAD_POOL = [] # 线程池
# 全局变量 + 线程锁
ACTIVE_CONNECTIONS = 0
CONN_LOCK = threading.Lock()
THREAD_LOCK = threading.Lock()
def is_allowed_domain(target_host):
"""
校验目标域名是否在白名单内(新增函数)
:param target_host: 目标域名(字符串格式,如 "www.baidu.com")
:return: True(允许访问)/ False(拒绝访问)
"""
if not target_host:
return False
# 转换为小写进行不区分大小写的匹配
host = target_host.lower().strip()
for pattern in DOMAIN_WHITELIST:
pattern = pattern.lower().strip()
# 完全通配符:允许所有域名
if pattern == "*":
return True
# 通配符匹配:*.example.com
if pattern.startswith("*."):
# 去掉通配符后的域名后缀
suffix = pattern[1:] # 如 ".baidu.com"
# 检查是否以该后缀结尾,或者完全等于去掉通配符后的部分(如 "baidu.com" 匹配 "*.baidu.com")
if host.endswith(suffix) or host == suffix[1:]:
return True
# 精确匹配
if host == pattern:
return True
return False
def is_allowed_ip(client_ip):
"""
校验客户端 IP 是否在白名单内(兼容 Python 3.5.2)
:param client_ip: 客户端 IP 地址(字符串格式)
:return: True(允许连接)/ False(拒绝连接)
"""
if not client_ip:
return False
try:
# 将客户端 IP 转为 ip_address 对象
client_ip_obj = ipaddress.ip_address(client_ip)
# 遍历白名单,逐一校验
for allowed_item in IP_WHITELIST:
try:
# Python 3.5.2 ip_network 不支持 strict=False,直接尝试网段匹配
allowed_network = ipaddress.ip_network(allowed_item)
if client_ip_obj in allowed_network:
return True
except ValueError:
# 网段匹配失败,按单个 IP 精确匹配
try:
allowed_ip_obj = ipaddress.ip_address(allowed_item)
if client_ip_obj == allowed_ip_obj:
return True
except:
# 非法 IP 格式,跳过该白名单项
continue
# 未匹配到白名单
return False
except Exception as e:
log("IP 校验异常 | 客户端 IP: {} | 错误: {}".format(client_ip, e))
return False
def log(msg):
"""带时间戳、连接数的线程安全日志(使用 format 兼容 Python 3.5)"""
global ACTIVE_CONNECTIONS
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
with CONN_LOCK:
print("[{}] [CONNS:{}] {}".format(timestamp, ACTIVE_CONNECTIONS, msg), flush=True)
def set_socket_opt(sock):
"""优化套接字参数"""
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 5)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, BUFFER_SIZE * 4)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, BUFFER_SIZE * 4)
sock.settimeout(TIMEOUT_SECONDS)
except Exception as e:
log("套接字参数优化失败: {}".format(e))
def relay_bidirectional(client_sock, remote_sock, desc=""):
"""鲁棒的双向数据转发(兼容 Python 3.5.2)"""
global ACTIVE_CONNECTIONS
bytes_sent = 0
bytes_recv = 0
try:
set_socket_opt(client_sock)
set_socket_opt(remote_sock)
read_list = [client_sock, remote_sock]
log("{}: 开始双向转发 (缓冲区:{} 超时:{}s)".format(desc, BUFFER_SIZE, TIMEOUT_SECONDS))
while read_list:
rlist, _, xlist = select.select(read_list, [], read_list, TIMEOUT_SECONDS)
if xlist:
log("{}: 套接字异常 {}".format(desc, xlist))
break
if not rlist:
if bytes_sent + bytes_recv > 0:
log("{}: 超时但已转发数据,继续等待 (已发:{} 已收:{})".format(desc, bytes_sent, bytes_recv))
continue
log("{}: 无数据转发超时退出".format(desc))
break
for sock in rlist:
try:
data = sock.recv(BUFFER_SIZE)
if not data:
try:
test_data = sock.recv(1, socket.MSG_PEEK)
if not test_data:
log("{}: 确认对方关闭连接 (已转发: {} 字节)".format(desc, bytes_sent+bytes_recv))
return
except:
pass
continue
if sock is client_sock:
target = remote_sock
bytes_sent += len(data)
log("{}: 客户端→目标 [{} 字节] (累计:{})".format(desc, len(data), bytes_sent))
else:
target = client_sock
bytes_recv += len(data)
log("{}: 目标→客户端 [{} 字节] (累计:{})".format(desc, len(data), bytes_recv))
target.sendall(data)
except BlockingIOError:
continue
except socket.timeout:
log("{}: 单方向数据读取超时,继续等待".format(desc))
continue
except Exception as e:
log("{}: 转发错误 {}: {}".format(desc, type(e).__name__, e))
return
except Exception as e:
log("{}: 转发异常 {}: {}".format(desc, type(e).__name__, e))
finally:
with CONN_LOCK:
ACTIVE_CONNECTIONS -= 1
# 优雅关闭套接字
for s in [client_sock, remote_sock]:
try:
s.shutdown(socket.SHUT_RDWR)
except:
pass
try:
s.close()
except:
pass
log("{}: 转发结束 (总计转发: {} 字节)".format(desc, bytes_sent+bytes_recv))
def parse_target(request_line):
"""解析目标地址(兼容 Python 3.5.2)"""
try:
if request_line.startswith('CONNECT'):
target = request_line.split()[1]
if ':' in target:
host, port = target.split(':', 1)
port = int(port)
else:
host = target
port = 443
return host, port
else:
from urllib.parse import urlparse
url = request_line.split()[1]
parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or (80 if parsed.scheme == 'http' else 443)
return host, port
except Exception as e:
log("解析目标失败: {} | 请求行: {}".format(e, request_line[:100]))
return None, None
def handle_connect(client_sock, request_line, client_addr):
"""处理 HTTPS CONNECT 隧道(增加域名白名单校验)"""
host, port = parse_target(request_line)
if not host or not port:
client_sock.sendall(b'HTTP/1.1 400 Bad Request\r\n\r\nInvalid target')
return
# ==================== 域名白名单校验(新增) ====================
if not is_allowed_domain(host):
log("❌ 拒绝访问非白名单域名 | 客户端: {} | 目标: {}:{}".format(client_addr, host, port))
error_response = (
b"HTTP/1.1 403 Forbidden\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n"
b"Connection: close\r\n"
b"\r\n"
b"403 Forbidden: Domain not in whitelist\r\n"
)
try:
client_sock.sendall(error_response)
except:
pass
return
# ==================================================================
try:
remote_sock = None
for retry in range(2):
try:
remote_sock = socket.create_connection((host, port), timeout=10)
break
except:
time.sleep(0.1)
if not remote_sock:
raise Exception("连接目标失败(2次重试)")
response = (
b"HTTP/1.1 200 Connection Established\r\n"
b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S GMT").encode() + b"\r\n"
b"Server: SimpleProxy/1.0\r\n"
b"Connection: keep-alive\r\n"
b"Proxy-Connection: keep-alive\r\n"
b"\r\n"
)
client_sock.sendall(response)
relay_bidirectional(
client_sock, remote_sock,
"{} <-> {}:{} (HTTPS)".format(client_addr, host, port)
)
except Exception as e:
error_msg = "HTTP/1.1 502 Bad Gateway\r\nContent-Type: text/plain\r\n\r\n{}".format(str(e))
try:
client_sock.sendall(error_msg.encode('utf-8', errors='replace'))
except:
pass
log("{}: CONNECT 失败 {}:{} | {}".format(client_addr, host, port, e))
def handle_http(client_sock, request_data, client_addr):
"""处理 HTTP 请求(增加域名白名单校验)"""
try:
header_end = request_data.find(b'\r\n\r\n')
if header_end == -1:
client_sock.sendall(b'HTTP/1.1 400 Bad Request\r\n\r\nMissing header end')
return
request_line = request_data[:header_end].split(b'\r\n')[0].decode('utf-8', errors='ignore')
host, port = parse_target(request_line)
if not host or not port:
client_sock.sendall(b'HTTP/1.1 400 Bad Request\r\n\r\nInvalid URL')
return
# ==================== 域名白名单校验(新增) ====================
if not is_allowed_domain(host):
log("❌ 拒绝访问非白名单域名 | 客户端: {} | 目标: {}:{}".format(client_addr, host, port))
error_response = (
b"HTTP/1.1 403 Forbidden\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n"
b"Connection: close\r\n"
b"\r\n"
b"403 Forbidden: Domain not in whitelist\r\n"
)
try:
client_sock.sendall(error_response)
except:
pass
return
# ==================================================================
header_lines = request_data[:header_end].split(b'\r\n')
new_headers = []
has_host = False
for line in header_lines:
lower_line = line.lower()
if any(p in lower_line for p in [b'proxy-', b'connection: keep-alive']):
continue
if lower_line.startswith(b'host:'):
has_host = True
new_headers.append(line)
if not has_host:
new_headers.insert(1, "Host: {}".format(host).encode())
new_headers.append(b"Connection: keep-alive")
new_headers.append(b"Accept-Encoding: gzip, deflate")
new_header = b'\r\n'.join(new_headers)
body = request_data[header_end+4:]
forward_data = new_header + b'\r\n\r\n' + body
remote_sock = socket.create_connection((host, port), timeout=10)
set_socket_opt(remote_sock)
remote_sock.sendall(forward_data)
relay_bidirectional(
client_sock, remote_sock,
"{} <-> {}:{} (HTTP)".format(client_addr, host, port)
)
except Exception as e:
error_msg = "HTTP/1.1 500 Internal Server Error\r\n\r\n{}".format(str(e))
try:
client_sock.sendall(error_msg.encode())
except:
pass
log("{}: HTTP 处理失败 | {}".format(client_addr, e))
def handle_client(client_sock, client_addr):
"""处理单个客户端连接(工作线程执行函数,兼容 Python 3.5.2)"""
global ACTIVE_CONNECTIONS
with CONN_LOCK:
ACTIVE_CONNECTIONS += 1
try:
client_sock.settimeout(TIMEOUT_SECONDS)
set_socket_opt(client_sock)
request_data = b''
while len(request_data) < MAX_HEADER_SIZE:
chunk = client_sock.recv(BUFFER_SIZE)
if not chunk:
return
request_data += chunk
if b'\r\n\r\n' in request_data:
break
if not request_data:
log("{}: 空请求,关闭连接".format(client_addr))
return
first_line = request_data.split(b'\r\n')[0].decode('utf-8', errors='ignore').strip()
log("{}: 收到请求 | {}".format(client_addr, first_line[:100]))
if first_line.upper().startswith('CONNECT'):
handle_connect(client_sock, first_line, client_addr)
else:
handle_http(client_sock, request_data, client_addr)
except socket.timeout:
log("{}: 客户端超时({}s)".format(client_addr, TIMEOUT_SECONDS))
except Exception as e:
log("{}: 处理异常 {}: {}".format(client_addr, type(e).__name__, e))
finally:
try:
client_sock.close()
except:
pass
# 清理线程池中的无效线程(兼容 Python 3.5 列表操作)
with THREAD_LOCK:
# 先创建新列表,再赋值给 THREAD_POOL,避免迭代时修改列表
valid_threads = []
for thread in THREAD_POOL:
if thread.is_alive():
valid_threads.append(thread)
THREAD_POOL[:] = valid_threads
def main():
"""主服务入口(IP 过滤 + 线程分配,兼容 Python 3.5.2)"""
def signal_handler(_sig, _frame):
log("收到退出信号,正在关闭所有线程...")
with THREAD_LOCK:
for thread in THREAD_POOL:
thread.join(timeout=2)
log("所有线程已关闭,代理退出")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server_sock.bind((LISTEN_IP, LISTEN_PORT))
server_sock.listen(512)
server_sock.settimeout(1.0)
log("多线程代理服务器启动成功 ✅ | {}:{}".format(LISTEN_IP, LISTEN_PORT))
log("IP 白名单: {}".format(IP_WHITELIST))
log("域名白名单: {}".format(DOMAIN_WHITELIST)) # 新增日志输出
log("最大并发线程数: {}".format(MAX_THREADS))
log("测试命令: curl -x http://127.0.0.1:{} https://www.baidu.com".format(LISTEN_PORT))
while True:
try:
# 接受新客户端连接
client_sock, client_addr = server_sock.accept()
client_ip = client_addr[0] # 提取客户端 IP 地址(如 192.168.1.100)
# ==================== IP 白名单过滤(核心拦截点) ====================
if not is_allowed_ip(client_ip):
# 非白名单 IP,直接关闭连接并记录日志
log("❌ 拒绝非白名单 IP 连接 | 客户端 IP: {} | 地址: {}".format(client_ip, client_addr))
client_sock.close()
continue
# ==================================================================
# 控制线程数,避免资源耗尽
with THREAD_LOCK:
# 清理已结束的线程
valid_threads = []
for thread in THREAD_POOL:
if thread.is_alive():
valid_threads.append(thread)
THREAD_POOL[:] = valid_threads
# 若线程数达到上限,等待一段时间
if len(THREAD_POOL) >= MAX_THREADS:
log("线程数达到上限 {},等待空闲线程...".format(MAX_THREADS))
time.sleep(1)
continue
# 创建新线程处理客户端连接
client_thread = threading.Thread(
target=handle_client,
args=(client_sock, client_addr),
daemon=True # 守护线程,主程序退出时自动销毁
)
# 将线程加入线程池
with THREAD_LOCK:
THREAD_POOL.append(client_thread)
# 启动线程
client_thread.start()
log("✅ {}: 已分配工作线程(当前线程数: {})".format(client_addr, len(THREAD_POOL)))
except socket.timeout:
continue
except Exception as e:
log("主循环异常: {}".format(e))
except PermissionError:
log("权限不足 ❌ | 无法绑定 {}(需 sudo 或端口>1024)".format(LISTEN_PORT))
sys.exit(1)
except Exception as e:
log("启动失败 ❌ | {}".format(e))
sys.exit(1)
finally:
try:
server_sock.close()
except:
pass
if __name__ == '__main__':
main()
本文地址:https://zhaoshuman.cn/%E6%8A%80%E6%9C%AF%E5%88%86%E4%BA%AB/10.html
免责声明:本文为原创文章,版权归 zhaoshuman 所有,欢迎分享本文,转载请保留出处!
免责声明:本文为原创文章,版权归 zhaoshuman 所有,欢迎分享本文,转载请保留出处!
发表评论