代码
import time
import requests
from urllib.parse import urljoin
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_data(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"请求失败,状态码:{response.status_code}")
return None
def save_image(image_url, title):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}
response = requests.get(image_url, headers=headers)
if response.status_code == 200:
if not os.path.exists('movie_images'):
os.makedirs('movie_images')
# 自动提取图片后缀名
file_ext = os.path.splitext(image_url)[1]
with open(f'movie_images/{title}{file_ext}', 'wb') as f:
f.write(response.content)
print(f"{title} 图片保存成功")
else:
print(f"{title} 图片下载失败")
def download_single_movie(item):
title = item['title']
movie_url = item['url']
score = item['score']
image_url = item['cover_url']
print(f"标题: {title}, URL: {movie_url}, 分数: {score}")
save_image(image_url, title)
def main():
# 定义目标接口地址(豆瓣电影排行榜的 Top 250 数据接口)
url = 'https://movie.douban.com/j/chart/top_list?type=11&interval_id=100%3A90&action='
# 获取接口返回的 JSON 数据
data = fetch_data(url)
# data output: [{"rating":["9.7","50"],"rank":1,"cover_url":"https://img3.doubanio.com/view/photo/s_ratio_poster/public/p480747492.webp","is_playable":true,"id":"1292052","types":["犯罪","剧情"],"regions":["美国"],"title":"肖申克的救赎","url":"https://movie.douban.com/subject/1292052/","release_date":"1994-09-10","actor_count":47,"vote_count":3181940,"score":"9.7","actors":["蒂姆·罗宾斯","摩根·弗里曼","鲍勃·冈顿","威廉姆·赛德勒","克兰西·布朗","吉尔·贝罗斯","马克·罗斯顿","詹姆斯·惠特摩","杰弗里·德曼","拉里·布兰登伯格"],"is_watched":false}]
# 记录程序开始执行的时间,用于后续计算总耗时
start_time = time.time()
if data:
# 使用线程池进行并发处理,max_workers 控制最大并发线程数(这里设置为 20)
with ThreadPoolExecutor(max_workers=20) as executor:
# 提交所有任务到线程池,每个 item 对应一个 download_single_movie 任务
futures = [executor.submit(download_single_movie, item) for item in data]
# 遍历所有已完成的任务,future.result() 会触发异常捕获(如果有的话)
for future in as_completed(futures):
try:
future.result() # 获取线程中执行的结果(此处主要用于捕获异常)
except Exception as e:
print(f"发生错误: {e}") # 打印具体出错的任务信息
end_time = time.time() # 结束计时
print(f"并发处理总耗时:{end_time - start_time:.2f} 秒")
if __name__ == "__main__":
main()
最后编辑: 海马 文档更新时间: 2026-02-10 17:26 作者:海马