目录:
import requests
from fake_useragent import UserAgent
ua = UserAgent()
headers = {'User-Agent': ua.random}
r = requests.get('http://httpbin.org/user-agent', headers=headers)
print(r.json())
# 输出: {'user-agent': 'Mozilla/5.0 (Macintosh; U; PPC Mac OS X; en-us) AppleWebKit/312.5.1 (KHTML, like Gecko) Safari/312.3.1'}
import requests
import urllib3
import ssl
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib3.util.ssl_ import create_urllib3_context
# 禁用安全警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class CustomSSLAdapter(HTTPAdapter):
"""自定义SSL适配器,优化TLS配置"""
def init_poolmanager(self, *args, **kwargs):
ctx = create_urllib3_context()
# 禁用不安全的协议版本
ctx.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
# 强制使用TLSv1.2
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.maximum_version = ssl.TLSVersion.TLSv1_2
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
def make_request(**kwargs):
"""发送请求的封装函数"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
# 挂载自定义适配器
adapter = CustomSSLAdapter(max_retries=retry_strategy)
session.mount('https://', adapter)
session.mount('http://', adapter)
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Connection': 'close', # 关闭 keep-alive
}
if kwargs['headers'] is not None:
headers.update(kwargs['headers'])
kwargs['headers'] = headers
kwargs['timeout'] = (20, 120)
kwargs['verify'] = False
try:
response = session.request(**kwargs)
response.raise_for_status() # 检查HTTP错误
return response
except requests.exceptions.SSLError as e:
logger.error(f"SSL错误: {e}")
logger.error("建议:1. 联系服务器管理员检查SSL配置")
logger.error(" 2. 确认服务器支持TLSv1.2或更高版本")
logger.error(" 3. 检查中间网络设备(防火墙/代理)配置")
return None
except requests.exceptions.RequestException as e:
logger.error(f"请求异常: {e}")
return None
finally:
session.close()
如果已经知道目标页面编码,可以通过指定 encoding 参数来设置编码。
import requests
response = requests.get('http://example.com')
response.encoding = 'utf-8' # 或 'gbk', 'gb2312' 等
print(response.text)
在不知道编码的前提下,可以采用以下方法:
import requests
response = requests.get('http://example.com')
set_encoding(response)
def set_encoding(response):
"""
设置编码
"""
# Requests 的 utils 模块提供了一个函数 get_encodings_from_content
# 可以从 HTML 内容本身(如 <meta> 标签)提取声明的编码
encodings = requests.utils.get_encodings_from_content(response.text)
if encodings:
# 使用找到的第一个编码
response.encoding = encodings[0]
elif response.apparent_encoding:
# Requests 库内置了基于 chardet 库的编码猜测功能
response.encoding = response.apparent_encoding
else:
# 有的网站的响应头中会包含编码信息,直接进行字符串截取获取编码方式
# { 'Content-Type': 'text/html; charset=utf-8' }
content_type = response.headers.get('content-type', '')
if 'charset=' in content_type:
encoding = content_type.split('charset=')[-1]
response.encoding = encoding
import requests
from fake_useragent import UserAgent
def download_file(url, save_path = None, chunk_size=8192, show_progress=True):
"""
音频下载函数
:param url: 音频文件的网络链接,例如 ' https://example.com/song.mp3'
:param save_path: 本地保存路径,例如 'song.mp3' 或 'audio/song.m4a'
:param chunk_size: 每次读取的数据块大小(字节),默认为8KB
"""
try:
# 从 URL 中获取文件名
if save_path is None:
save_path = url.split('?')[0].split('/')[-1]
# 构造随机 user agent
ua = UserAgent()
headers = {'User-Agent': ua.random}
# stream=True 不要立即下载整个响应体,而是准备进行流式读取
response = requests.get(url, stream=True, headers=headers)
# 如果状态码不是200,将抛出HTTPError异常
response.raise_for_status()
# 尝试从响应头获取文件总大小
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=chunk_size):
# 过滤掉可能存在的保持连接的空块
if chunk:
file.write(chunk)
downloaded_size += len(chunk)
# 如果知道总大小,则计算并显示进度
if total_size > 0 and show_progress == True:
percent = (downloaded_size / total_size) * 100
# 使用回车符\r实现进度条原地更新
print(f"\r下载进度: {percent:.2f}% ({downloaded_size}/{total_size} bytes)", end='')
print(f'音频文件下载成功!保存在:{save_path}')
except requests.exceptions.Timeout:
print("请求超时")
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
except requests.exceptions.ConnectionError:
print("网络连接错误")
except requests.exceptions.RequestException as e:
print(f'下载音频时发生错误:{e}')
示例:
file_url = 'https://example.com/song.mp3'
# 下载文件,默认使用 file_url 的文件名保存,song.mp3
download_file(file_url)
# 自定义下载文件名
download_file(file_url, 'my_song.mp3')
# 不显示进度条
download_file(file_url, show_progress=False)
# 自定义下载块大小,默认每 8kb 下载一个块,可以设置每 30kb 下载一个块
download_file(file_url, chunk_size=1024 * 30)
↶ 返回首页 ↶