【python 爬虫】爬虫秘籍

2024-01-26 00:00:00

目录:

生成随机 User-Agent

import requests
from fake_useragent import UserAgent

ua = UserAgent()
headers = {'User-Agent': ua.random}
r = requests.get('http://httpbin.org/user-agent', headers=headers)
print(r.json())
# 输出: {'user-agent': 'Mozilla/5.0 (Macintosh; U; PPC Mac OS X; en-us) AppleWebKit/312.5.1 (KHTML, like Gecko) Safari/312.3.1'}

requests 报错 SSLError

import requests
import urllib3
import ssl
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib3.util.ssl_ import create_urllib3_context

# 禁用安全警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class CustomSSLAdapter(HTTPAdapter):
    """自定义SSL适配器,优化TLS配置"""
    def init_poolmanager(self, *args, **kwargs):
        ctx = create_urllib3_context()
        # 禁用不安全的协议版本
        ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
        # 强制使用TLSv1.2
        ctx.minimum_version = ssl.TLSVersion.TLSv1_2
        ctx.maximum_version = ssl.TLSVersion.TLSv1_2
        kwargs['ssl_context'] = ctx
        return super().init_poolmanager(*args, **kwargs)


def make_request(**kwargs):
    """发送请求的封装函数"""
    session = requests.Session()
    
    # 配置重试策略
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "POST"]
    )
    
    # 挂载自定义适配器
    adapter = CustomSSLAdapter(max_retries=retry_strategy)
    session.mount('https://', adapter)
    session.mount('http://', adapter)
    
    # 设置请求头
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'application/json',
        'Connection': 'close',  # 关闭 keep-alive
    }

    if kwargs['headers'] is not None:
        headers.update(kwargs['headers'])

    kwargs['headers'] = headers
    kwargs['timeout'] = (20, 120)
    kwargs['verify'] = False
    
    try:
        response = session.request(**kwargs)
        response.raise_for_status()  # 检查HTTP错误
        return response
    except requests.exceptions.SSLError as e:
        logger.error(f"SSL错误: {e}")
        logger.error("建议:1. 联系服务器管理员检查SSL配置")
        logger.error("     2. 确认服务器支持TLSv1.2或更高版本")
        logger.error("     3. 检查中间网络设备(防火墙/代理)配置")
        return None
    except requests.exceptions.RequestException as e:
        logger.error(f"请求异常: {e}")
        return None
    finally:
        session.close()

中文乱码

如果已经知道目标页面编码,可以通过指定 encoding 参数来设置编码。

import requests

response = requests.get('http://example.com')
response.encoding = 'utf-8'  # 或 'gbk', 'gb2312' 等
print(response.text)

在不知道编码的前提下,可以采用以下方法:

import requests

response = requests.get('http://example.com')
set_encoding(response)

def set_encoding(response):
    """
    设置编码
    """
    # Requests 的 utils 模块提供了一个函数 get_encodings_from_content
    # 可以从 HTML 内容本身(如 <meta> 标签)提取声明的编码
    encodings = requests.utils.get_encodings_from_content(response.text)
    if encodings:
        # 使用找到的第一个编码
        response.encoding = encodings[0]  
    elif response.apparent_encoding:
        # Requests 库内置了基于 chardet 库的编码猜测功能
        response.encoding = response.apparent_encoding
    else:
        # 有的网站的响应头中会包含编码信息,直接进行字符串截取获取编码方式
        # { 'Content-Type': 'text/html; charset=utf-8' }
        content_type = response.headers.get('content-type', '')
        if 'charset=' in content_type:
            encoding = content_type.split('charset=')[-1]
            response.encoding = encoding

下载媒体文件

import requests
from fake_useragent import UserAgent

def download_file(url, save_path = None, chunk_size=8192, show_progress=True):
    """
    音频下载函数
    :param url: 音频文件的网络链接,例如 ' https://example.com/song.mp3'
    :param save_path: 本地保存路径,例如 'song.mp3' 或 'audio/song.m4a'
    :param chunk_size: 每次读取的数据块大小(字节),默认为8KB
    """
    try:
        # 从 URL 中获取文件名
        if save_path is None:
            save_path = url.split('?')[0].split('/')[-1]
        # 构造随机 user agent
        ua = UserAgent()
        headers = {'User-Agent': ua.random}
        # stream=True 不要立即下载整个响应体,而是准备进行流式读取
        response = requests.get(url, stream=True, headers=headers)
        # 如果状态码不是200,将抛出HTTPError异常
        response.raise_for_status()
        # 尝试从响应头获取文件总大小
        total_size = int(response.headers.get('content-length', 0))
        downloaded_size = 0
        with open(save_path, 'wb') as file:
            for chunk in response.iter_content(chunk_size=chunk_size):
                # 过滤掉可能存在的保持连接的空块
                if chunk:
                    file.write(chunk)
                    downloaded_size += len(chunk)
                    # 如果知道总大小,则计算并显示进度
                    if total_size > 0 and show_progress == True:
                        percent = (downloaded_size / total_size) * 100
                        # 使用回车符\r实现进度条原地更新
                        print(f"\r下载进度: {percent:.2f}% ({downloaded_size}/{total_size} bytes)", end='')
        print(f'音频文件下载成功!保存在:{save_path}')
    except requests.exceptions.Timeout:
        print("请求超时")
    except requests.exceptions.HTTPError as e:
        print(f"HTTP错误: {e}")
    except requests.exceptions.ConnectionError:
        print("网络连接错误")
    except requests.exceptions.RequestException as e:
        print(f'下载音频时发生错误:{e}')

示例:

file_url = 'https://example.com/song.mp3'

# 下载文件,默认使用 file_url 的文件名保存,song.mp3
download_file(file_url)

# 自定义下载文件名
download_file(file_url, 'my_song.mp3')

# 不显示进度条
download_file(file_url, show_progress=False)

# 自定义下载块大小,默认每 8kb 下载一个块,可以设置每 30kb 下载一个块 
download_file(file_url, chunk_size=1024 * 30)

返回首页

本文总阅读量  次
皖ICP备17026209号-3
总访问量: 
总访客量: