Bilibili浏览历史获取器
Nov 6, 2023
·
13 mins read
介绍
这个脚本允许您获取并存储在本地 SQLite 数据库中的 Bilibili 浏览历史记录。它包括使用二维码扫描进行身份验证的登录机制。以下是一些使用脚本的重要注意事项和说明:
脚本需要完善
- 无前端页面
- 无远端同步
- 制作周期短
- 脚本未完整: v0.9.0
重要注意事项
- 使用注意: 避免过度使用此脚本,以防止暂时封禁。
- 数据存储: 超过三个月的浏览数据不会远程保存。此脚本设计用于永久数据存储。
- 手动输入: 在 ‘cookies’ 变量中手动输入从扫描二维码获取的会话数据。
配置
在运行脚本之前,请根据您的需求修改以下参数:
# 用您的实际 SESSDATA 值替换 '26...'
cookies = {'SESSDATA': '26'}
# 脚本执行间隔时间(单位:天)
daily_time = 1
# 网络问题的重试和跳过限制
retry_limit = 3
skip_limit = 3
# 脚本在中断时的行为
keepWhile = 1 # 0: 禁用, 1: 启用, 2: 无数据(自动)
依赖项
确保已安装所需的依赖项,运行:
pip install Pillow qrcode[pil] fake_useragent qrcode-terminal requests
运行脚本
通过运行以下命令执行脚本:
python integrated_code.py
这将根据您的会话状态,获取您的 Bilibili 历史记录或启动带有二维码的登录过程。
注意:如果脚本文件名不同,请将 ‘integrated_code.py’ 替换为实际的脚本文件名。
注意事项
- 该脚本使用 SQLite 进行本地数据存储。
- 它获取并存储您的 Bilibili 浏览历史记录。
- 登录过程涉及扫描二维码(需手动配置重启)。
- 较多支持 windows 和 linux。
根据您的需求随意定制脚本。
源码
# integrated_code.py
from PIL import Image
from io import BytesIO
import os
import http.cookiejar as cookielib
import qrcode
import random
from fake_useragent import UserAgent
from qrcode_terminal import draw # Import for Linux
import requests
import sqlite3
from datetime import datetime, timezone, timedelta
from time import sleep as time_sleep
## 注意事项
### 不要过量使用本脚本,注意防止被短暂封禁
### 三个月以外浏览数据在远端不保存,本脚本用于永久保存数据
### 使用方法:手动将扫码获取到的sessiondata填入cookies
## 一定修改
### Replace '26...' with your actual SESSDATA value
cookies = {'SESSDATA': '26'}
## 按需更改
daily_time = 1 # 脚本执行间隔时间(单位:天)
#re_check = 0 # 数据维护(不建议开启)0:关闭 1:开启 3:清者(配合keepWhile使用)
### 网络不好时修改
retry_limit = 3 # 重试次数限制
skip_limit = 3 # 连续跳过次数限制
### 如果脚本异常中断,使用以便一直获取数据直到最后
keepWhile = 1 # 0:关闭 1:开启 2:无数据(自动)
## 不要修改
url = 'https://api.bilibili.com/x/web-interface/history/cursor' # 历史记录获取api
initial_params = {'ps': '30', 'max': '0', 'view_at': '0', 'type': 'all'} # 历史记录获取api初识参数
## ======================================================= ##
def save_login_info(username, login_status, cookies):
# 插入登录信息到表中
cursor.execute('''
INSERT INTO bilibili_login_info (username, login_status, cookies)
VALUES (?, ?, ?)
''', (username, login_status, cookies))
conn.commit()
class ShowPng:
def __init__(self, data):
self.data = data
def show(self):
try:
if os.name == 'posix': # Check if the OS is Linux
img = Image.open(BytesIO(self.data))
draw(img) # Display ASCII art on Linux
elif os.name == 'nt': # Check if the OS is Windows
img_path = 'temp_qr_code.png'
with open(img_path, 'wb') as img_file:
img_file.write(self.data)
os.system(f'start {img_path}') # Open the image with the default viewer on Windows
except Exception as e:
print("Displaying image:")
print(repr(self.data))
# agent.py
def get_user_agents():
user_agent = UserAgent()
return user_agent.random
# bzlogin.py
def is_login(session):
try:
session.cookies.load(ignore_discard=True)
except Exception:
pass
login_url = session.get("https://api.bilibili.com/x/web-interface/nav", verify=False, headers=headers).json()
if login_url['code'] == 0:
print('Cookies值有效,', login_url['data']['uname'], ',已登录!')
return session, True
else:
print('Cookies值已经失效,请重新扫码登录!')
return session, False
def bz_login():
if not os.path.exists('bzcookies.txt'):
with open("bzcookies.txt", 'w') as f:
f.write("")
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename='bzcookies.txt')
session, status = is_login(session)
if not status:
get_login = session.get('https://passport.bilibili.com/qrcode/getLoginUrl', headers=headers).json()
login_url = requests.get(get_login['data']['url'], headers=headers).url
oauth_key = get_login['data']['oauthKey']
qr = qrcode.QRCode(box_size=10)
qr.add_data(login_url)
img = qr.make_image()
qr.print_ascii(out=None,tty=False,invert=False)
a = BytesIO()
img.save(a, 'png')
png = a.getvalue()
a.close()
viewer = ShowPng(png)
viewer.show()
token_url = 'https://passport.bilibili.com/qrcode/getLoginInfo'
while True:
qr_code_data = session.post(token_url, data={'oauthKey': oauth_key, 'gourl': 'https://www.bilibili.com/'}, headers=headerss).json()
print(qr_code_data)
if '-4' in str(qr_code_data['data']):
print('二维码未失效,请扫码!')
elif '-5' in str(qr_code_data['data']):
print('已扫码,请确认!')
elif '-2' in str(qr_code_data['data']):
print('二维码已失效,请重新运行!')
break
elif 'True' in str(qr_code_data['status']):
print('已确认,登入成功!')
session.get(qr_code_data['data']['url'], headers=headers)
break
else:
print('其他:', qr_code_data)
time_sleep(2)
print(session.cookies)
session.cookies.save()
return session
def connect_to_database():
# 连接本地 SQLite 数据库
conn = sqlite3.connect('bili_data.db')
cursor = conn.cursor()
# 检查是否已存在表
cursor.execute('''
CREATE TABLE IF NOT EXISTS bilibili_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
cover TEXT,
author_name TEXT,
author_face TEXT,
view_at INTEGER,
tag_name TEXT,
bvid TEXT,
oid TEXT,
business TEXT,
part TEXT,
badge TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS bilibili_login_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT,
login_status TEXT,
cookies TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
return conn, cursor
def get_latest_created_at(cursor,keepWhile=keepWhile,daily_time=daily_time):
# 获取最新记录的 created_at 字段
latest_record = cursor.execute('''
SELECT created_at FROM bilibili_history
ORDER BY created_at DESC
LIMIT 1
''').fetchone()
if latest_record:
# 将字符串转换为 datetime 对象
latest_created_at = datetime.strptime(latest_record[0], '%Y-%m-%d %H:%M:%S')
# 计算最早和最晚时间戳
latest_fetch = int(latest_created_at.timestamp())
oldest_fetch = int((latest_created_at - timedelta(days=daily_time)).timestamp())
print(f"本次获取应该是从 {latest_fetch} 开始")
else:
keepWhile = 2
latest_fetch = 0
oldest_fetch = 0
print("数据库中没有记录")
return latest_fetch, oldest_fetch
def save_login_info(username, login_status, cookies):
# 创建数据库连接
conn, cursor = connect_to_database()
# 插入登录信息到表中
cursor.execute('''
INSERT INTO bilibili_login_info (username, login_status, cookies)
VALUES (?, ?, ?)
''', (username, login_status, cookies))
conn.commit()
# 关闭数据库连接
conn.close()
def fetch_bilibili_history(keepWhile=keepWhile,skip_limit=skip_limit,retry_limit=retry_limit):
# 创建数据库连接
conn, cursor = connect_to_database()
# 获取最新记录的 created_at 字段
latest_fetch, oldest_fetch = get_latest_created_at(cursor,keepWhile=keepWhile)
# 开始循环获取
skip_count = 0 # 连续跳过计数器
while True:
# 发送 GET 请求到 Bilibili API,并添加自动重试逻辑
retry_count = 0
while retry_count < retry_limit:
try:
response = requests.get(
url, params=initial_params, cookies=cookies)
response.raise_for_status() # 检查网络请求是否成功
break
except requests.exceptions.RequestException as e:
retry_count += 1
print(f"获取数据失败。正在重试 {retry_count}/{retry_limit} 次...")
print(f"错误信息: {e}")
sleep(2)
# 检查是否成功获取数据
if retry_count == retry_limit or response.status_code != 200:
print(f"{retry_count} 次重试后仍然无法获取数据。退出程序。")
break
# 如果连续跳过次数超过限制,停止循环
if skip_count >= skip_limit:
print('结束')
break
# 检查 API 返回的 code 值
api_code = response.json().get('code', -400) # 默认为 -400,表示请求错误
if api_code == 0:
conf = response.json()['data']['cursor']
data = response.json()['data']['list']
# 本次获取的第一组
if initial_params['max'] == '0' and initial_params['view_at'] == '0':
print('====开始循环获取浏览历史====')
else:
for item in data:
# 取出符合条件的数据,去除重复
if item['view_at'] > latest_fetch or keepWhile == 2 or keepWhile == 1:
if item['view_at'] > latest_fetch:
selected_data = cursor.execute('''
SELECT DISTINCT bvid, author_name, view_at FROM bilibili_history
WHERE bvid = ? AND author_name = ? AND view_at BETWEEN ? AND ?
ORDER BY view_at DESC
''', (item['history']['bvid'], item['author_name'], oldest_fetch, latest_fetch)).fetchall()
if keepWhile == 1 or keepWhile == 2:
selected_data = cursor.execute('''
SELECT DISTINCT bvid, author_name, view_at FROM bilibili_history
WHERE bvid = ? AND author_name = ?
ORDER BY view_at DESC
''', (item['history']['bvid'], item['author_name'])).fetchall()
if selected_data:
if keepWhile == 1:
skip_count = 0
# 跳过已有数据
else:
if skip_count <= skip_limit:
skip_count += 1
else:
if skip_count <= skip_limit:
skip_count = 0
#print('插入新记录')
# 插入新记录
cursor.execute('''
INSERT OR IGNORE INTO bilibili_history (title, cover, author_name, author_face, view_at, tag_name, bvid, oid, business, part, badge, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
item['title'],
item['cover'],
item['author_name'],
item['author_face'],
item['view_at'],
item['tag_name'],
item['history']['bvid'],
item['history']['oid'],
item['history']['business'],
item['history']['part'],
item['badge'],
datetime.fromtimestamp(item['view_at'], timezone(timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')
))
else:
print('已经是最新的数据')
skip_count = skip_limit
break
# 提交更改
conn.commit()
# 更新参数,继续下一轮请求
initial_params['max'] = str(conf['max'])
initial_params['view_at'] = str(conf['view_at'])
else:
break
print(f"API 返回错误代码 {api_code}。正在退出。")
# 关闭数据库连接
conn.close()
if __name__ == '__main__':
headers = {'User-Agent': get_user_agents(), 'Referer': "https://www.bilibili.com/"}
headerss = {'User-Agent': get_user_agents(), 'Host': 'passport.bilibili.com', 'Referer': "https://passport.bilibili.com/login"}
if cookies['SESSDATA']:
fetch_bilibili_history()
else:
bz_login()
print('将获取的session填入脚本开头的cookie中')
Sharing is caring!