鱼C论坛

 找回密码
 立即注册
查看: 57|回复: 1

想请教下想爬取ytb数据,但是显示访问超时

[复制链接]
发表于 前天 10:40 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
想请教下,我用 ytbv3 官方 API 爬取数据时出现访问超时,已开启quanju代理,且环境测试脚本运行正常。请问有朋友能指导下如何解决吗?


爬取脚本
  1. from googleapiclient.discovery import build
  2. import pandas as pd
  3. from datetime import datetime
  4. import os

  5. # -------------------------
  6. # 1. 配置参数
  7. # -------------------------
  8. API_KEY = aaaaaaaaaaaa  # 替换成你的 YouTube Data API Key
  9. PLAYLIST_ID = "aaaaaaaaaaaa"  # 替换成目标播放列表ID
  10. OUTPUT_FILE = "youtube_playlist_stats.xlsx"

  11. # -------------------------
  12. # 2. 初始化 YouTube API
  13. # -------------------------
  14. youtube = build("youtube", "v3", developerKey=API_KEY)

  15. # -------------------------
  16. # 3. 获取播放列表中的视频ID
  17. # -------------------------
  18. video_ids = []
  19. next_page_token = None

  20. while True:
  21.     playlist_request = youtube.playlistItems().list(
  22.         part="contentDetails",
  23.         playlistId=PLAYLIST_ID,
  24.         maxResults=50,
  25.         pageToken=next_page_token
  26.     )
  27.     playlist_response = playlist_request.execute()

  28.     for item in playlist_response["items"]:
  29.         video_ids.append(item["contentDetails"]["videoId"])

  30.     next_page_token = playlist_response.get("nextPageToken")
  31.     if not next_page_token:
  32.         break

  33. print(f"📋 播放列表共找到 {len(video_ids)} 个视频")

  34. # -------------------------
  35. # 4. 获取视频统计数据
  36. # -------------------------
  37. all_data = []
  38. crawl_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

  39. for i in range(0, len(video_ids), 50):
  40.     batch = video_ids[i:i+50]
  41.     video_request = youtube.videos().list(
  42.         part="statistics,snippet",
  43.         id=",".join(batch)
  44.     )
  45.     video_response = video_request.execute()

  46.     for item in video_response["items"]:
  47.         stats = item["statistics"]
  48.         snippet = item["snippet"]

  49.         all_data.append({
  50.             "抓取时间": crawl_time,
  51.             "视频标题": snippet["title"],
  52.             "视频ID": item["id"],
  53.             "播放量": int(stats.get("viewCount", 0)),
  54.             "点赞数": int(stats.get("likeCount", 0)),
  55.             "评论数": int(stats.get("commentCount", 0))
  56.         })

  57. # -------------------------
  58. # 5. 保存到 Excel
  59. # -------------------------
  60. df_new = pd.DataFrame(all_data)

  61. if os.path.exists(OUTPUT_FILE):
  62.     df_old = pd.read_excel(OUTPUT_FILE)
  63.     df_all = pd.concat([df_old, df_new], ignore_index=True)
  64. else:
  65.     df_all = df_new

  66. df_all.to_excel(OUTPUT_FILE, index=False)

  67. print(f"✅ 数据已保存到 {OUTPUT_FILE}")
复制代码








环境测试脚本a
  1. import requests

  2. try:
  3.     # 访问 YouTube Data API 测试 URL
  4.     r = requests.get("通过自己的apikey获取某个ytb视频的链接,目前还没有权限发链接", timeout=10)
  5.     print(r.status_code, r.text[:200])
  6. except Exception as e:
  7.     print("访问失败:", e)
复制代码

小甲鱼最新课程 -> https://ilovefishc.com
回复

使用道具 举报

发表于 前天 10:42 | 显示全部楼层
您好!看到您在使用 YouTube Data API v3 时遇到访问超时问题,虽然已开启全局代理且环境测试正常,但爬取脚本仍超时。我来帮您分析一下可能的原因和解决方案。

首先,您的爬取脚本中有几处代码错误需要修复,这些错误可能导致请求无法正常发送。以下是修正后的代码关键部分:


  1. # 初始化 video_ids 为空列表
  2. video_ids = []

  3. # 获取播放列表中的视频ID
  4. next_page_token = None
  5. while True:
  6.     playlist_request = youtube.playlistItems().list(
  7.         part="contentDetails",
  8.         playlistId=PLAYLIST_ID,
  9.         maxResults=50,
  10.         pageToken=next_page_token
  11.     )
  12.     playlist_response = playlist_request.execute()
  13.    
  14.     # 正确提取 items
  15.     items = playlist_response.get('items', [])
  16.     for item in items:
  17.         video_id = item['contentDetails']['videoId']
  18.         video_ids.append(video_id)  # 添加视频ID到列表
  19.    
  20.     next_page_token = playlist_response.get('nextPageToken')
  21.     if not next_page_token:
  22.         break

  23. print(f"找到 {len(video_ids)} 个视频")

  24. # 获取视频统计数据
  25. all_data = []
  26. crawl_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

  27. # 分批处理视频ID,每批50个
  28. for i in range(0, len(video_ids), 50):
  29.     batch = video_ids[i:i+50]  # 正确切片
  30.     video_request = youtube.videos().list(
  31.         part="statistics,snippet",
  32.         id=",".join(batch)
  33.     )
  34.     video_response = video_request.execute()
  35.     items = video_response.get('items', [])
  36.    
  37.     for item in items:
  38.         stats = item.get('statistics', {})
  39.         snippet = item.get('snippet', {})
  40.         all_data.append({
  41.             "抓取时间": crawl_time,
  42.             "视频标题": snippet.get('title', 'N/A'),
  43.             "视频ID": item.get('id', 'N/A'),
  44.             "播放量": int(stats.get("viewCount", 0)),
  45.             "点赞数": int(stats.get("likeCount", 0)),
  46.             "评论数": int(stats.get("commentCount", 0))
  47.         })

  48. # 保存到Excel(修正concat部分)
  49. if os.path.exists(OUTPUT_FILE):
  50.     df_old = pd.read_excel(OUTPUT_FILE)
  51.     df_all = pd.concat([df_old, df_new], ignore_index=True)  # 添加缺失参数
  52. else:
  53.     df_all = df_new
  54. df_all.to_excel(OUTPUT_FILE, index=False)
复制代码


### 解决超时问题的建议:
1. 代理设置问题:虽然您开启了全局代理,但 googleapiclient 可能不会自动使用系统代理。建议在代码中显式设置代理。例如,使用 requests 库的会话对象来配置代理,然后传递给 build 函数。示例:
  

  1.    import requests
  2.    from googleapiclient.discovery import build
  3.    from googleapiclient.http import HttpRequest

  4.    # 创建带代理的 HTTP 对象
  5.    proxies = {
  6.        "http": "[url=http://your-proxy-address:port",]http://your-proxy-address:port",[/url]  # 替换为您的代理地址和端口
  7.        "https": "[url=http://your-proxy-address:port"]http://your-proxy-address:port"[/url]
  8.    }
  9.    session = requests.Session()
  10.    session.proxies = proxies
  11.    http = HttpRequest(session=session)

  12.    # 初始化 YouTube API 时传入 http 对象
  13.    youtube = build("youtube", "v3", developerKey=API_KEY, http=http)
  14.    
复制代码

   注意:将 your-proxy-address:port 替换为实际代理信息。

2. 添加超时和重试机制:API 请求可能因网络波动超时,建议在请求中添加超时参数和重试逻辑。例如:
  

  1.    from tenacity import retry, stop_after_attempt, wait_exponential

  2.    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
  3.    def make_request(request):
  4.        return request.execute()
  5.    
  6.    # 在调用 request.execute() 的地方使用 make_request(playlist_request)
  7.    
复制代码

   首先安装 tenacity 库: pip install tenacity

3. 检查 API 配额和限制:确保您的 API key 没有超过每日配额或速率限制。您可以在 Google Cloud Console 中查看使用情况。

4. 网络测试:运行环境测试脚本时,确认使用的是相同代理设置。如果测试脚本正常,但主脚本超时,可能是代码逻辑或代理未生效导致。

如果问题依旧,请检查代理稳定性或尝试更换代理节点。希望这能帮到您!如果还有其他问题,欢迎继续问。

以上回复来自 FishC AI 小助理,如未能正确解答您的问题,请继续追问。
小甲鱼最新课程 -> https://ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-9-10 17:58

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表