今天我们抓取的是m3u8的视频,视频有长视频和短视频之分.
抓取m3u8类型视频
对于短视频
一般来说一个视频对应的就是一个url
长视频
一个视频就几百兆到几十G不等 这种视频往往不会由一个连接进行全部返回 视频是由多个片段组成的每个片段大概是几秒到几分钟
所以对于长视频的下载没有办法通过一个url进行获取下载,需要获取每个频段的url 进行下载进行拼接最终变成一个完整的视频
然而视频还有加密之分
我们使用合并视频的工具 ffmpeg ,如果没有加密的视频,直接根据每个片段的url进行下载拼接即可 如果有加密的视频 根据加密的地址 请求加密的key,然后在进行合并,FFmpeg会自动进行解密
网站链接:下面就会有网站链接,我就不放了
第一次响应的地址: /20211213/fl6Q2MC2/1100kb/hls/index.m3u8
第二次请求m3u8:https://v4.cdtlas.com/20211213/fl6Q2MC2/1100kb/hls/index.m3u8
第一次请求m3u8:https://v4.cdtlas.com/20211213/fl6Q2MC2/index.m3u8
也就是是说第一次请求的域名加上响应就是第二次的地址
第二次的响应情况如下
找的页面里第一个m3u8的地址src="/js/player/?url=https://v4.cdtlas.com/20211213/fl6Q2MC2/index.m3u8&id=60825&num=1&count=8&vt=1"
导包
import requests
import re
url = 'https://www.wbdyba.com/play/60825_1_1.html'
headers = {
# 'Referer': 'https://www.wbdyba.com/play/60825_1_1.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
}
# 把网页源代码下载到本地
res = requests.get(url, headers)
# with open('The_Outcast.html', 'wb')as f:
# f.write(res.content)
with open('The_Outcast.html','r',encoding='utf-8')as f:
data = f.read()
base_m3u8_url = re.search(
'<iframe id="mplay" name="mplay" src="/js/player/?url=(.*?)" '
'allowfullscreen="allowfullscreen" mozallowfullscreen="mozallowfullscreen" msallowfullscreen="msallowfullscreen" oallowfullscreen="oallowfullscreen" webkitallowfullscreen="webkitallowfullscreen" frameBorder="no" width="100%" scrolling="no" height="100%">',data).group(1)
m3u8_url = base_m3u8_url.split('&')[0]
在这里我为什么要切片切出来呢?因为源码正则得到的m3u8地址不可用
# https://v4.cdtlas.com/20211213/fl6Q2MC2/index.m3u8&id=60825&num=1&count=8&vt=1 源码正则得到的
# https://v4.cdtlas.com/20211213/fl6Q2MC2/index.m3u8 网页发送请求的
当我在得到这两个url的时候,我也不确定哪一个能请求到真正的视频资源
所以我在下面用了两中验证来确定了,我们要的是浏览器能请求的m3u8地址
res1= requests.get('https://v4.cdtlas.com/20211213/fl6Q2MC2/index.m3u8&id=60825&num=1&count=8&vt=1',headers=headers)
res2 = requests.get('https://v4.cdtlas.com/20211213/fl6Q2MC2/index.m3u8',headers=headers)
print(res1.content.decode())
print(res2.content.decode())
if (res1==res2):
print('success')
else:
print('false')
结果如下
D:\Tools\Tool\python\python.exe D:/Tools/Workspace/PyWorkspace/dateProject/demo08-下载m3u8视频/一人之下.py
file not found
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1100000,RESOLUTION=864x486
/20211213/fl6Q2MC2/1100kb/hls/index.m3u8
false
进行m3u8第一次请求 目的为了获取第二次请求的m3U8的地址
res1 = requests.get(m3u8_url,headers=headers)
with open('index1.m3u8', 'wb', )as f:
f.write(res1.content)
with open('index1.m3u8', 'r', encoding='utf-8')as f:
data_index = f.readlines()[-1].strip()
# print(data_index)
'''
https://v4.cdtlas.com/20211213/fl6Q2MC2/1100kb/hls/index.m3u8
'''
# 拼接第二次请求的url 获得第二次请求的m3u8文件
url2 = urljoin(m3u8_url, data_index)
res2 = requests.get(url2,headers = headers)
with open('index2.m3u8', 'wb', )as f:
f.write(res2.content)
封装得到m3u8的方法
def get_m3u8(url):
res = requests.get(url, headers)
base_m3u8_url = re.search(
'<iframe id="mplay" name="mplay" src="/js/player/?url=(.*?)" '
'allowfullscreen="allowfullscreen" mozallowfullscreen="mozallowfullscreen" msallowfullscreen="msallowfullscreen" oallowfullscreen="oallowfullscreen" webkitallowfullscreen="webkitallowfullscreen" frameBorder="no" width="100%" scrolling="no" height="100%">',
res.content.decode()).group(1)
m3u8_url = base_m3u8_url.split('&')[0]
# 进行m3u8第一次请求 目的为了获取第二次请求的m3U8的地址
res1 = requests.get(m3u8_url, headers=headers)
with open('index1.m3u8', 'wb', )as f:
f.write(res1.content)
with open('index1.m3u8', 'r', encoding='utf-8')as f:
data_index = f.readlines()[-1].strip()
# print(data_index)
'''
https://v4.cdtlas.com/20211213/fl6Q2MC2/1100kb/hls/index.m3u8
'''
# 拼接第二次请求的url 获得第二次请求的m3u8文件
url2 = urljoin(m3u8_url, data_index)
res2 = requests.get(url2, headers=headers)
with open('index2.m3u8', 'wb', )as f:
f.write(res2.content)
return url2
下载所有的ts
def download_all():
# 下载存储ts文件的路径
path = 'ts'
if not os.path.exists(path):
os.mkdir(path)
with open('index2.m3u8', 'r', encoding='utf-8')as f:
data = f.readlines()
# print(data)
for i in data:
'''
这里还可以协程
if l.startwith('#'):
continue
'''
if (i.startswith('https')):
# 取出url结尾的换行符
ts_url = i.strip()
down_one(ts_url, path)
下载单个ts
def down_one(ts_url, path):
print(ts_url, '===正在下载')
'''
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/Vuzb3l70.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/0HiRyylQ.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/Z6W1fh5y.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/u9ZW3cN2.ts
'''
name = ts_url.split('/')[-1]
res = requests.get(ts_url, headers=headers)
# 进行下载储存
with open(os.path.join(path, name), 'wb')as f:
f.write(res.content)
from concurrent.futures import ThreadPoolExecutor,wait
def down_one(ts_url, path):
while 1:
try:
print(ts_url, '===正在下载')
'''
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/Vuzb3l70.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/0HiRyylQ.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/Z6W1fh5y.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/u9ZW3cN2.ts
'''
name = ts_url.split('/')[-1]
# 请求ts的url如果60秒还没下载完进行重新下载
res = requests.get(ts_url, headers=headers,timeout=60)
# 进行下载储存
with open(os.path.join(path, name), 'wb')as f:
f.write(res.content)
print(ts_url, '===下载成功')
break
except:
print(url,'下载失败,重新在下载')
def download_all():
# 下载存储ts文件的路径
path = 'ts'
if not os.path.exists(path):
os.mkdir(path)
with open('index2.m3u8', 'r', encoding='utf-8')as f:
data = f.readlines()
# print(data)
#创建线程池
pool = ThreadPoolExecutor(100)
tasks = []# 装任务的列表
for i in data:
'''
这里还可以协程
if l.startwith('#'):
continue
'''
if (i.startswith('https')):
# 取出url结尾的换行符
ts_url = i.strip()
# 添m任务到线程池
tasks.append(pool.submit(down_one(ts_url, path)))
# 阻塞等地 所有都下载完毕再向下执行
wait(tasks)
# 处理下载后ts文件和m3u8文件的对应关系
def do_m3u8_url():
with open('index2.m3u8', 'r', encoding='utf-8')as f:
data = f.readlines()
f = open(os.path.join(path, 'index3.m3u8'), 'w', encoding='UTF-8')
for i in data:
if i.startswith('#'):
# 这一步操作是把index3.m3u8文件里的key换成我们自己的
f.write(i)
if i.find('URI')!=-1:
new_i = i.replace('https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/key.key','key.m3u8')
f.write(new_i)
else:
f.write(i.split('/')[-1])
def merge(filename='output'):
'''
进行ts文件合并 解决视频音频不同步的问题 建议使用这种
:param filePath:
:return:
'''
os.chdir(path)
cmd = f'D:/Tools/Tool/ffmpeg-4.4.1-essentials_build/bin/ffmpeg -i index.m3u8 -c copy {filename}.mp4'
os.system(cmd)
# 下载密钥
def get_key():
resp = requests.get('https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/key.key')
f = open(os.path.join(path, 'key.m3u8'), 'w', encoding='UTF-8')
f.write(resp.text)
if __name__ == '__main__':
url = 'https://www.wbdyba.com/play/60825_1_1.html'
headers = {
'Referer': 'https://www.wbdyba.com/play/60825_1_1.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
}
# 下载存储ts文件的路径
path = 'ts'
# 获取m3u8文件内容也就是所有的ts 的url
get_m3u8(url)
download_all()
do_m3u8_url()
# 进行合并处理
merge()
import os
import re
from urllib.parse import urljoin
import requests
from aiohttp import ClientSession, TCPConnector
import asyncio
import aiofiles
async def get_m3u8(url):
async with ClientSession(connector=TCPConnector(ssl=False), headers=headers) as session:
async with session.get(url)as resp:
data = await resp.text(encoding='utf-8')
base_m3u8_url = re.search('<iframe id="mplay" name="mplay" src="/js/player/?url=(.*?)" '
'allowfullscreen="allowfullscreen" mozallowfullscreen="mozallowfullscreen" msallowfullscreen="msallowfullscreen" oallowfullscreen="oallowfullscreen" webkitallowfullscreen="webkitallowfullscreen" frameBorder="no" width="100%" scrolling="no" height="100%">'
, data).group(1)
m3u8_url = base_m3u8_url.split('&')[0]
async with session.get(m3u8_url)as resp:
data = await resp.text(encoding='utf-8')
# 进行m3u8第一次请求 目的为了获取第二次请求的m3U8的地址
async with aiofiles.open('index.m3u8', 'w', encoding='utf-8')as f:
await f.write(data)
async with aiofiles.open('index.m3u8', 'r', encoding='utf-8')as f:
data_index = await f.readlines()
# 进行第二次url请求 url的获取
data_index = data_index[-1].strip()
# print(data_index)
'''
https://v4.cdtlas.com/20211213/fl6Q2MC2/1100kb/hls/index.m3u8
'''
# 拼接第二次请求的url 获得第二次请求的m3u8文件
url2 = urljoin(m3u8_url, data_index)
async with session.get(url2)as resp:
data = await resp.text(encoding='utf-8')
async with aiofiles.open('index.m3u8', 'w', encoding='utf-8')as f:
await f.write(data)
return url2
async def down_one(ts_url, path, sem):
while 1:
# 使用信号量控制并发
async with sem:
try:
# 请求ts的url如果60秒还没下载完进行重新下载
async with ClientSession(connector=TCPConnector(ssl=False), headers=headers)as session:
print(ts_url, '===正在下载')
name = ts_url.split('/')[-1]
'''
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/Vuzb3l70.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/0HiRyylQ.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/Z6W1fh5y.ts
https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/u9ZW3cN2.ts
'''
async with session.get(ts_url, timeout=60)as resp:
data = await resp.read()
# 进行下载储存
async with aiofiles.open(os.path.join(path, name), 'wb')as f:
await f.write(data)
print(ts_url, '===下载成功')
break
except:
print(ts_url, '下载失败,重新在下载')
async def download_all():
with open('index.m3u8', 'r', encoding='utf-8')as f:
data = f.readlines()
if not os.path.exists(path):
os.mkdir(path)
# print(data)
# 创建线程池
sem = asyncio.Semaphore(200)
tasks = [] # 装任务的列表
for i in data:
'''
这里还可以协程
if l.startwith('#'):
continue
'''
if (i.startswith('https')):
# 取出url结尾的换行符
ts_url = i.strip()
# 添m任务到线程池
tasks.append(asyncio.create_task(down_one(ts_url, path, sem)))
# 阻塞等地 所有都下载完毕再向下执行
await asyncio.wait(tasks)
# 处理下载后ts文件和m3u8文件的对应关系
def do_m3u8_url():
with open('index.m3u8', 'r', encoding='utf-8')as f:
data = f.readlines()
f = open(os.path.join(path, 'index.m3u8'), 'w', encoding='UTF-8')
for i in data:
if i.startswith('#'):
# 这一步操作是把index.m3u8文件里的key换成我们自己的
f.write(i)
if i.find('URI')!=-1:
new_i = i.replace('https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/key.key','key.m3u8')
f.write(new_i)
else:
f.write(i.split('/')[-1])
# 下载密钥
def get_key():
resp = requests.get('https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/key.key')
f = open(os.path.join(path, 'key.m3u8'), 'w', encoding='UTF-8')
f.write(resp.text)
def merge(filename='output'):
'''
进行ts文件合并 解决视频音频不同步的问题 建议使用这种
:param filePath:
:return:
'''
os.chdir(path)
cmd = f'D:/Tools/Tool/ffmpeg-4.4.1-essentials_build/bin/ffmpeg -i index.m3u8 -c copy {filename}.mp4'
os.system(cmd)
async def main():
# 获取m3u8文件内容也就是所有的ts 的url
task = asyncio.create_task(get_m3u8(url))
await asyncio.gather(task)
task = asyncio.create_task(download_all())
await asyncio.gather(task)
do_m3u8_url()
get_key()
# 进行合并处理
merge()
if __name__ == '__main__':
url = 'https://www.wbdyba.com/play/60825_1_1.html'
headers = {
'Referer': 'https://www.wbdyba.com/play/60825_1_1.html',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78'
}
# 下载存储ts文件的路径
path = 'ts'
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 下载密钥
def get_key():
resp = requests.get('https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/key.key')
f = open(os.path.join(path, 'key.key'), 'w', encoding='UTF-8')
f.write(resp.text)
if i.find('URI')!=-1:
new_i = i.replace('https://hey07.cjkypo.com/20211213/fl6Q2MC2/1100kb/hls/key.key','key.m3u8')
f.write(new_i)
else:
这里我直接使用的取巧的方法了,直接使用查找字符串并改变字符串
写完这个方法后在main函数内调用一次就好了
在我们请求到m3u8地址时,源码内抓到的跟浏览器抓包工具里抓到的不一样,我们就需要把从源码得到的地址转成浏览器一样的地址,如果遇到ts文件中带key的要把他解密放在ts文件夹里.在密钥解密这里需要注意一点
ts文件夹的结构应该为
project/
ts/
0.ts
1.ts
...
index.m3u8
key.m3u8
密钥内容为key.m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:1
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-KEY:METHOD=AES-128,URI="key.m3u8"
#EXTINF:1.235,
0.ts
#EXTINF:1.001,
1.ts
#EXTINF:1.001,
2.ts
我只抓取了第一集,小伙伴像抓取其他集,只需要用一个for循环输入不同的url即可