代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
图片爬虫脚本
从photo.json中提取big图片地址,拼接URL并下载保存
"""

import json
import os
import requests
from pathlib import Path
from urllib.parse import urljoin
import concurrent.futures
from typing import List, Dict
import time

# 配置参数
BASE_URL = "https://p.chinarun.com"
SAVE_DIR = "./data/down_photo_2025"
JSON_FILE = "./data_demo/photo2025.json"
MAX_WORKERS = 5  # 并发下载数量
TIMEOUT = 30  # 下载超时时间(秒)
RETRY_TIMES = 3  # 重试次数
VERIFY_SSL = False  # 是否验证SSL证书(False=跳过SSL验证)


class ImageDownloader:
    """图片下载器"""

    def __init__(self, save_dir: str, max_workers: int = 5):
        # 跨平台兼容:自动处理不同操作系统的路径分隔符(/ 或 \)
        # .exists() - 检查是否存在  self.exists = Path(save_dir).exists()
        # .mkdir() - 创建目录  self.mkdir = Path(save_dir).mkdir(parents=True, exist_ok=True)
        # .parent - 获取父目录  self.parent_dir = Path(save_dir).parent
        # .name - 获取文件名
        # 链式操作:self.save_dir / "file.jpg" 可以拼接路径 例 self.save_dir = Path(save_dir) / "file.jpg"
        self.save_dir = Path(save_dir)
        self.max_workers = max_workers
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
        })
        # 禁用SSL警告
        import urllib3
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        # 跳过SSL验证
        self.session.verify = VERIFY_SSL
        self.success_count = 0
        self.failed_count = 0
        self.failed_urls = []

    def ensure_save_dir(self):
        """确保保存目录存在"""
        self.save_dir.mkdir(parents=True, exist_ok=True)
        print(f"✓ 保存目录: {self.save_dir}")

    def load_json_data(self, json_file: str) -> List[Dict]:
        """加载JSON数据
        json_file 示例内容
        [
            {
                "big": "/upload/photos/2025/2025bjcsfzxmls/big/20250419105640_253_000107.jpg",
                "bigsize": "1080,1624",
                "gid": 3326809,
                "id": 315755520,
                "mid": 315755520,
                "price": "",
                "small": "/upload/photos/2025/2025bjcsfzxmls/small/20250419105640_253_000107.jpg",
                "ts": "1745031400",
                "type": "photo"
            }
        ]
        """
        print(f"📖 加载JSON文件: {json_file}")
        try:
            with open(json_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            print(f"✓ 成功加载 {len(data)} 条记录")
            return data
        except Exception as e:
            print(f"✗ 加载JSON文件失败: {e}")
            return []

    def extract_image_urls(self, data: List[Dict]) -> List[str]:
        """提取所有big图片URL"""
        print("\n🔍 提取图片地址...")
        urls = []
        # enumerate() 是 Python 的一个内置函数,用于在遍历可迭代对象时同时获取索引和值。
        for i, item in enumerate(data):
            if 'big' in item and item['big']:
                big_path = item['big']
                # 拼接完整URL
                full_url = urljoin(BASE_URL, big_path)
                urls.append({
                    'url': full_url,
                    'id': item.get('id', i),
                    'path': big_path
                })

        print(f"✓ 提取到 {len(urls)} 个图片地址")
        return urls

    def download_image(self, image_info: Dict) -> bool:
        """下载单张图片"""
        url = image_info['url']
        image_id = image_info['id']
        path = image_info['path']

        # 生成保存文件名
        filename = f"{image_id}_{os.path.basename(path)}"
        save_path = self.save_dir / filename

        # 检查文件是否已存在
        if save_path.exists():
            print(f"  ✓ 已存在: {filename}")
            return True

        # 下载图片
        for retry in range(RETRY_TIMES):
            try:
                response = self.session.get(url, timeout=TIMEOUT)
                response.raise_for_status()

                # 保存文件
                with open(save_path, 'wb') as f:
                    f.write(response.content)

                file_size = len(response.content) / 1024  # KB
                print(f"  ✓ 下载成功: {filename} ({file_size:.1f} KB)")
                return True

            except Exception as e:
                if retry < RETRY_TIMES - 1:
                    print(f"  ⚠ 重试 {retry + 1}/{RETRY_TIMES}: {filename}")
                    time.sleep(1)
                else:
                    print(f"  ✗ 下载失败: {filename} - {e}")
                    return False

        return False

    def download_all(self, urls: List[Dict]):
        """并发下载所有图片"""
        print(f"\n🚀 开始下载 {len(urls)} 张图片...")
        print(f"   并发数: {self.max_workers}")
        print(f"   保存位置: {self.save_dir}")
        print("=" * 80)

        start_time = time.time()

        # 使用线程池并发下载
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(self.download_image, url_info): url_info
                for url_info in urls
            }

            for future in concurrent.futures.as_completed(futures):
                url_info = futures[future]
                try:
                    success = future.result()
                    if success:
                        self.success_count += 1
                    else:
                        self.failed_count += 1
                        self.failed_urls.append(url_info['url'])
                except Exception as e:
                    self.failed_count += 1
                    self.failed_urls.append(url_info['url'])
                    print(f"  ✗ 异常: {url_info['url']} - {e}")

        elapsed_time = time.time() - start_time
        self.print_summary(elapsed_time, len(urls))

    def print_summary(self, elapsed_time: float, total: int):
        """
        打印下载摘要,包括下载统计信息和失败的URL列表

        Args:
            elapsed_time: 总耗时(秒)
            total: 尝试下载的总数量
        """
        print("=" * 80)
        print(f"\n📊 下载完成!")
        print(f"   总计:     {total} 张")
        print(f"   成功:     {self.success_count} 张")
        print(f"   失败:     {self.failed_count} 张")
        print(f"   耗时:     {elapsed_time:.1f} 秒")
        if self.success_count > 0:
            print(f"   平均速度: {self.success_count / elapsed_time:.2f} 张/秒")

        if self.failed_urls:
            print(f"\n❌ 失败的URL已保存到: {self.save_dir / 'failed_urls.txt'}")
            with open(self.save_dir / 'failed_urls.txt', 'w', encoding='utf-8') as f:
                for url in self.failed_urls:
                    f.write(f"{url}\n")


def main():
    """主函数"""
    print("=" * 80)
    print("🖼️  图片爬虫工具")
    print("=" * 80)

    # 创建下载器
    downloader = ImageDownloader(
        save_dir=SAVE_DIR,
        max_workers=MAX_WORKERS
    )

    # 确保保存目录存在
    downloader.ensure_save_dir()

    # 加载JSON数据
    data = downloader.load_json_data(JSON_FILE)
    if not data:
        print("✗ 没有数据,退出")
        return

    # 提取图片URL
    urls = downloader.extract_image_urls(data)
    if not urls:
        print("✗ 没有提取到图片地址,退出")
        return

    # 显示前5个URL预览
    print("\n📋 URL预览(前5个):")
    for i, url_info in enumerate(urls[:5]):
        print(f"   {i+1}. {url_info['url']}")
    if len(urls) > 5:
        print(f"   ... 还有 {len(urls) - 5} 个")

    # 确认下载
    # strip() 方法用于去除字符串首尾的空白字符(包括空格、制表符、换行符等)。
    # lower() 方法用于将字符串转换为小写
    confirm = input(f"\n是否继续下载?(y/n): ").strip().lower()
    if confirm != 'y':
        print("❌ 已取消")
        return

    # 下载所有图片
    downloader.download_all(urls)

    print("\n✨ 所有任务完成!")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n\n❌ 用户中断")
    except Exception as e:
        print(f"\n❌ 发生错误: {e}")
        import traceback
        traceback.print_exc()
作者:海马  创建时间:2026-04-20 17:19
最后编辑:海马  更新时间:2026-04-30 16:06
上一篇:
下一篇: