使用Python测试代理IP池深度
发布时间2025-06-26 05:06:59本工具采用 Python 的 asyncio 异步库结合 aiohttp,实现高并发异步请求代理测试。通过模拟多个请求轮次,可快速检测代理池中实际可用的不同IP数量,评估代理池的健康状况与深度。
为什么国内的动态住宅代理IP这么便宜,而国外的却昂贵?原因在于资源供需和运营成本:国内宽带普及率高,IP资源丰富,代理供应商通过规模化降低了价格;而国外住宅IP稀缺、合规成本高,因此价格普遍更高。
另一个常见误区是“IP数量少就不是代理池”。其实,即便只有100个住宅IP,只要这些IP具备动态轮换机制和稳定性,也能称作动态住宅代理池,并足以支撑中小规模的爬虫任务。
一、为什么要测试代理池深度?
- 判断真实可用性:很多代理服务商标称几千IP,但实际可用IP远低于宣传。
- 评估动态性:深度测试能判断IP是否频繁切换,从而规避封禁风险。
- 优化采购与使用策略:通过结果调整并发量、请求速率,提高采集效率。
二、工具介绍与测试思路
本工具基于 asyncio
+ aiohttp
实现高并发异步请求,通过多轮访问 httpbin.org/ip
获取出口IP,并统计唯一IP数量、成功率及IP分布,从而评估代理池的深度与质量。
三、代码功能逐步解析
1. 用户配置区
集中管理所有关键参数,便于快速调整测试规模和目标:
CONFIG = {
"proxy_url": "socks5://user:pass@127.0.0.1:8080", # SOCKS5代理池入口
"target_url": "http://httpbin.org/ip", # 用于返回当前出口IP
"concurrency": 300, # 每轮并发请求数量
"test_rounds": 5, # 总测试轮次
"timeout": 8, # 单个请求超时(秒)
"user_agents": [ # 随机User-Agent列表
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X)"
]
}
说明:通过调整 concurrency
和 test_rounds
,可模拟不同负载压力下代理池表现。
2. ProxyTester 类初始化与属性
负责封装整个测试逻辑,并提供状态统计容器:
class ProxyTester:
def __init__(self):
self.results = [] # 保存所有请求的详细结果
self.unique_ips = set() # 存储唯一出口IP,用于计算深度
self.ip_counter = defaultdict(int) # 每个IP出现次数统计
self.success_count = 0 # 成功请求数量
self.fail_count = 0 # 失败请求数量
self.session = None # aiohttp会话对象
逻辑:通过集合与计数字典实现快速统计,后续输出唯一IP数与分布。
3. 异步会话管理与单次请求逻辑
使用 aiohttp
管理连接池,发送请求并解析出口IP:
async def init_session(self):
conn = aiohttp.TCPConnector(limit=CONFIG["concurrency"])
self.session = aiohttp.ClientSession(
connector=conn,
timeout=aiohttp.ClientTimeout(total=CONFIG["timeout"])
)
async def close_session(self):
if self.session:
await self.session.close()
async def test_proxy(self, round_num: int, request_num: int):
headers = {"User-Agent": random.choice(CONFIG["user_agents"])}
try:
async with self.session.get(
CONFIG["target_url"],
proxy=CONFIG["proxy_url"],
headers=headers,
ssl=False
) as resp:
if resp.status == 200:
data = await resp.json()
ip = data.get("origin", "unknown").split(",")[0].strip()
self.unique_ips.add(ip)
self.ip_counter[ip] += 1
self.success_count += 1
else:
self.fail_count += 1
except Exception:
self.fail_count += 1
说明:每个请求通过代理发起,若返回状态码为200则记录IP,否则视为失败。
4. 并发执行与轮次控制
批量创建异步任务并行运行,通过多轮次测试观察IP变化趋势:
async def run_test_round(self, round_num: int):
tasks = [asyncio.create_task(self.test_proxy(round_num, i))
for i in range(CONFIG["concurrency"])]
await asyncio.gather(*tasks)
await asyncio.sleep(0.1) # 每轮间隔,避免对目标站过载
5. 完整可运行代码
整合以上模块,形成完整测试脚本:
import asyncio
import aiohttp
from collections import defaultdict
import time
import random
# ========== 配置区域 ==========
CONFIG = {
"proxy_url": "socks5://user:pass@127.0.0.1:8080",
"target_url": "http://httpbin.org/ip",
"concurrency": 300,
"test_rounds": 5,
"timeout": 8,
"user_agents": [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X)"
]
}
# ==============================
class ProxyTester:
def __init__(self):
self.results = []
self.unique_ips = set()
self.ip_counter = defaultdict(int)
self.success_count = 0
self.fail_count = 0
self.session = None
async def init_session(self):
conn = aiohttp.TCPConnector(limit=CONFIG["concurrency"])
self.session = aiohttp.ClientSession(
connector=conn,
timeout=aiohttp.ClientTimeout(total=CONFIG["timeout"])
)
async def close_session(self):
if self.session:
await self.session.close()
async def test_proxy(self, round_num: int, request_num: int):
headers = {"User-Agent": random.choice(CONFIG["user_agents"])}
try:
async with self.session.get(
CONFIG["target_url"],
proxy=CONFIG["proxy_url"],
headers=headers,
ssl=False
) as resp:
if resp.status == 200:
data = await resp.json()
ip = data.get("origin", "unknown").split(",")[0].strip()
self.unique_ips.add(ip)
self.ip_counter[ip] += 1
self.success_count += 1
else:
self.fail_count += 1
except Exception:
self.fail_count += 1
async def run_test_round(self, round_num: int):
tasks = [asyncio.create_task(self.test_proxy(round_num, i))
for i in range(CONFIG["concurrency"])]
await asyncio.gather(*tasks)
await asyncio.sleep(0.1)
def print_stats(self):
print("\n" + "="*60)
print("动态代理IP池深度测试结果".center(60))
print("="*60)
print(f"并发: {CONFIG['concurrency']} × 轮次: {CONFIG['test_rounds']}")
print(f"总请求: {self.success_count + self.fail_count}")
print(f"成功: {self.success_count} 失败: {self.fail_count}")
print(f"成功率: {self.success_count/(self.success_count+self.fail_count)*100:.2f}%")
print(f"唯一IP数量: {len(self.unique_ips)}")
print("\nIP分布Top20:")
for ip, count in sorted(self.ip_counter.items(), key=lambda x: x[1], reverse=True)[:20]:
print(f"{ip:<18} : {count}次")
if len(self.ip_counter) > 20:
print(f"...(共 {len(self.ip_counter)} 个不同IP)")
async def main():
tester = ProxyTester()
await tester.init_session()
start_time = time.time()
print(f"\n开始高并发代理测试: {CONFIG['concurrency']}并发 × {CONFIG['test_rounds']}轮次")
print(f"目标URL: {CONFIG['target_url']}")
print(f"代理地址: {CONFIG['proxy_url']}\n")
for round_num in range(1, CONFIG["test_rounds"] + 1):
print(f"正在执行第 {round_num}/{CONFIG['test_rounds']} 轮测试...")
await tester.run_test_round(round_num)
await tester.close_session()
tester.print_stats()
total_time = time.time() - start_time
print(f"\n总测试时间: {total_time:.2f}秒")
print(f"平均每秒请求: {(tester.success_count + tester.fail_count)/total_time:.2f}")
print("测试完成\n")
if __name__ == "__main__":
try:
import aiohttp
except ImportError:
print("请先安装依赖: pip install aiohttp")
exit(1)
if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
四、结果解读与实用建议
- 唯一IP数量:直接反映代理池深度。数量多表示资源丰富;数量少需评估是否满足业务需求。
- 成功率:成功率低于80%需检查代理服务稳定性或目标站屏蔽机制。
- IP分布:若个别IP出现次数过多,表示动态性不足,易触发封禁。
- 测试周期:建议分时段、多日测试,以判断代理池在不同时间段的表现。
五、总结
本文提供了一套完整的动态代理IP池深度测试方案,结合高并发异步请求与多轮次统计,能快速评估代理池的质量与动态性。通过该工具,你可以:
- 确认代理池真实可用IP数量(深度)
- 分析成功率与稳定性
- 评估代理池动态切换特征
即便是100个IP的小型代理池,只要动态切换频繁且成功率高,也能胜任中小型采集任务;而对于大规模采集,则需配合更高深度的代理池和分布式架构。