使用Python测试代理IP池深度

发布时间2025-06-26 05:06:59

本工具采用 Python 的 asyncio 异步库结合 aiohttp,实现高并发异步请求代理测试。通过模拟多个请求轮次,可快速检测代理池中实际可用的不同IP数量,评估代理池的健康状况与深度。

为什么国内的动态住宅代理IP这么便宜,而国外的却昂贵?原因在于资源供需和运营成本:国内宽带普及率高,IP资源丰富,代理供应商通过规模化降低了价格;而国外住宅IP稀缺、合规成本高,因此价格普遍更高。

另一个常见误区是“IP数量少就不是代理池”。其实,即便只有100个住宅IP,只要这些IP具备动态轮换机制和稳定性,也能称作动态住宅代理池,并足以支撑中小规模的爬虫任务。

一、为什么要测试代理池深度?

  • 判断真实可用性:很多代理服务商标称几千IP,但实际可用IP远低于宣传。
  • 评估动态性:深度测试能判断IP是否频繁切换,从而规避封禁风险。
  • 优化采购与使用策略:通过结果调整并发量、请求速率,提高采集效率。

二、工具介绍与测试思路

本工具基于 asyncio + aiohttp 实现高并发异步请求,通过多轮访问 httpbin.org/ip 获取出口IP,并统计唯一IP数量、成功率及IP分布,从而评估代理池的深度与质量。

三、代码功能逐步解析

1. 用户配置区

集中管理所有关键参数,便于快速调整测试规模和目标:

CONFIG = {
    "proxy_url": "socks5://user:pass@127.0.0.1:8080",   # SOCKS5代理池入口
    "target_url": "http://httpbin.org/ip",              # 用于返回当前出口IP
    "concurrency": 300,                                 # 每轮并发请求数量
    "test_rounds": 5,                                   # 总测试轮次
    "timeout": 8,                                       # 单个请求超时(秒)
    "user_agents": [                                    # 随机User-Agent列表
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X)"
    ]
}

说明:通过调整 concurrencytest_rounds,可模拟不同负载压力下代理池表现。

2. ProxyTester 类初始化与属性

负责封装整个测试逻辑,并提供状态统计容器:

class ProxyTester:
    def __init__(self):
        self.results = []             # 保存所有请求的详细结果
        self.unique_ips = set()       # 存储唯一出口IP,用于计算深度
        self.ip_counter = defaultdict(int)  # 每个IP出现次数统计
        self.success_count = 0        # 成功请求数量
        self.fail_count = 0           # 失败请求数量
        self.session = None           # aiohttp会话对象

逻辑:通过集合与计数字典实现快速统计,后续输出唯一IP数与分布。

3. 异步会话管理与单次请求逻辑

使用 aiohttp 管理连接池,发送请求并解析出口IP:

async def init_session(self):
    conn = aiohttp.TCPConnector(limit=CONFIG["concurrency"])
    self.session = aiohttp.ClientSession(
        connector=conn,
        timeout=aiohttp.ClientTimeout(total=CONFIG["timeout"])
    )

async def close_session(self):
    if self.session:
        await self.session.close()

async def test_proxy(self, round_num: int, request_num: int):
    headers = {"User-Agent": random.choice(CONFIG["user_agents"])}
    try:
        async with self.session.get(
            CONFIG["target_url"],
            proxy=CONFIG["proxy_url"],
            headers=headers,
            ssl=False
        ) as resp:
            if resp.status == 200:
                data = await resp.json()
                ip = data.get("origin", "unknown").split(",")[0].strip()
                self.unique_ips.add(ip)
                self.ip_counter[ip] += 1
                self.success_count += 1
            else:
                self.fail_count += 1
    except Exception:
        self.fail_count += 1

说明:每个请求通过代理发起,若返回状态码为200则记录IP,否则视为失败。

4. 并发执行与轮次控制

批量创建异步任务并行运行,通过多轮次测试观察IP变化趋势:

async def run_test_round(self, round_num: int):
    tasks = [asyncio.create_task(self.test_proxy(round_num, i))
             for i in range(CONFIG["concurrency"])]
    await asyncio.gather(*tasks)
    await asyncio.sleep(0.1)  # 每轮间隔,避免对目标站过载

5. 完整可运行代码

整合以上模块,形成完整测试脚本:

import asyncio
import aiohttp
from collections import defaultdict
import time
import random

# ========== 配置区域 ==========
CONFIG = {
    "proxy_url": "socks5://user:pass@127.0.0.1:8080",
    "target_url": "http://httpbin.org/ip",
    "concurrency": 300,
    "test_rounds": 5,
    "timeout": 8,
    "user_agents": [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X)"
    ]
}
# ==============================

class ProxyTester:
    def __init__(self):
        self.results = []
        self.unique_ips = set()
        self.ip_counter = defaultdict(int)
        self.success_count = 0
        self.fail_count = 0
        self.session = None

    async def init_session(self):
        conn = aiohttp.TCPConnector(limit=CONFIG["concurrency"])
        self.session = aiohttp.ClientSession(
            connector=conn,
            timeout=aiohttp.ClientTimeout(total=CONFIG["timeout"])
        )

    async def close_session(self):
        if self.session:
            await self.session.close()

    async def test_proxy(self, round_num: int, request_num: int):
        headers = {"User-Agent": random.choice(CONFIG["user_agents"])}
        try:
            async with self.session.get(
                CONFIG["target_url"],
                proxy=CONFIG["proxy_url"],
                headers=headers,
                ssl=False
            ) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    ip = data.get("origin", "unknown").split(",")[0].strip()
                    self.unique_ips.add(ip)
                    self.ip_counter[ip] += 1
                    self.success_count += 1
                else:
                    self.fail_count += 1
        except Exception:
            self.fail_count += 1

    async def run_test_round(self, round_num: int):
        tasks = [asyncio.create_task(self.test_proxy(round_num, i))
                 for i in range(CONFIG["concurrency"])]
        await asyncio.gather(*tasks)
        await asyncio.sleep(0.1)

    def print_stats(self):
        print("\n" + "="*60)
        print("动态代理IP池深度测试结果".center(60))
        print("="*60)
        print(f"并发: {CONFIG['concurrency']} × 轮次: {CONFIG['test_rounds']}")
        print(f"总请求: {self.success_count + self.fail_count}")
        print(f"成功: {self.success_count}  失败: {self.fail_count}")
        print(f"成功率: {self.success_count/(self.success_count+self.fail_count)*100:.2f}%")
        print(f"唯一IP数量: {len(self.unique_ips)}")
        print("\nIP分布Top20:")
        for ip, count in sorted(self.ip_counter.items(), key=lambda x: x[1], reverse=True)[:20]:
            print(f"{ip:<18} : {count}次")
        if len(self.ip_counter) > 20:
            print(f"...(共 {len(self.ip_counter)} 个不同IP)")

async def main():
    tester = ProxyTester()
    await tester.init_session()

    start_time = time.time()
    print(f"\n开始高并发代理测试: {CONFIG['concurrency']}并发 × {CONFIG['test_rounds']}轮次")
    print(f"目标URL: {CONFIG['target_url']}")
    print(f"代理地址: {CONFIG['proxy_url']}\n")

    for round_num in range(1, CONFIG["test_rounds"] + 1):
        print(f"正在执行第 {round_num}/{CONFIG['test_rounds']} 轮测试...")
        await tester.run_test_round(round_num)

    await tester.close_session()
    tester.print_stats()

    total_time = time.time() - start_time
    print(f"\n总测试时间: {total_time:.2f}秒")
    print(f"平均每秒请求: {(tester.success_count + tester.fail_count)/total_time:.2f}")
    print("测试完成\n")

if __name__ == "__main__":
    try:
        import aiohttp
    except ImportError:
        print("请先安装依赖: pip install aiohttp")
        exit(1)

    if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

    asyncio.run(main())

四、结果解读与实用建议

  • 唯一IP数量:直接反映代理池深度。数量多表示资源丰富;数量少需评估是否满足业务需求。
  • 成功率:成功率低于80%需检查代理服务稳定性或目标站屏蔽机制。
  • IP分布:若个别IP出现次数过多,表示动态性不足,易触发封禁。
  • 测试周期:建议分时段、多日测试,以判断代理池在不同时间段的表现。

五、总结

本文提供了一套完整的动态代理IP池深度测试方案,结合高并发异步请求与多轮次统计,能快速评估代理池的质量与动态性。通过该工具,你可以:

  • 确认代理池真实可用IP数量(深度)
  • 分析成功率与稳定性
  • 评估代理池动态切换特征

即便是100个IP的小型代理池,只要动态切换频繁且成功率高,也能胜任中小型采集任务;而对于大规模采集,则需配合更高深度的代理池和分布式架构。