我用了五个文件来实现爬取和即时数据保存. 经高人指点,代码非原创.
首先要得到东方财富网的帖子url. 经过分析可知在每一个股吧的初始页面都是诸如https://guba.eastmoney.com/list,code_k.html
的结构, list表示初始页面, code表示股票代码,_k表示评论所在页数.
而帖子的url则如:https://guba.eastmoney.com/news,code,code_of_comment.html
的结构, code_of_comment代表了每个帖子自己在东方财富网数据库中的id. 所以我们需要在每一页初始页面上获得当前页的帖子id, 并记录下来以检索每一个评论.
所以按照初始页面的url爬取每一个list下帖子的url:
import asyncio
import aiohttp
import random
import requests
from lxml import etree
import pandas as pd
headers = {
"Cookie": "qgqp_b_id=168d1a64114881cba2ba3d84da74e15f; st_si=59731079127804; st_pvi=90547074541767; st_sn=1; st_sp=2024-03-01%2016%3A29%3A21; st_inirUrl=; st_psi=20240301162920730-117001356556-9050495823; st_asi=20240301162920730-117001356556-9050495823-Web_so_srk-2",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
}
page_url = "https://guba.eastmoney.com/list,600436,f_"
api_url = '...' #使用快代理生成api链接
api_url2 = '...'
# 获取初始代理列表
proxy_list = requests.get(api_url).json().get('data').get('proxy_list')
print(proxy_list)
# 初始化一个空的DataFrame来存储链接
global_df = pd.DataFrame(columns=['url'])
async def fetch(url):
code='600436'
try:
async with aiohttp.ClientSession() as session:
proxy = random.choice(proxy_list)
print(url)
await asyncio.sleep(0.01)
async with session.get(url, proxy="http://" + proxy, headers=headers) as resp:
content = await resp.read()
content_text = content.decode('utf-8')
tree = etree.HTML(content_text)
href_list = [i for i in tree.xpath('//div[@class="title"]/a/@href') if code in i]
# 将提取的链接添加到全局DataFrame
for href in href_list:
global_df.loc[len(global_df)] = [href]
except aiohttp.ClientConnectorError:
try:
proxy_list.remove(proxy)
proxy_list.extend(requests.get(api_url2).json().get('data').get('proxy_list'))
async with aiohttp.ClientSession() as session:
proxy = random.choice(proxy_list)
print(url)
async with session.get(url, proxy="http://" + proxy, headers=headers) as resp:
content = await resp.read()
content_text = content.decode('utf-8')
tree = etree.HTML(content_text)
href_list = [i for i in tree.xpath('//div[@class="title"]/a/@href') if code in i]
# 将提取的链接添加到全局DataFrame
for href in href_list:
global_df.loc[len(global_df)] = [href]
except:
pass
async def main(num):
tasks = [fetch(page_url+str(i)+".html") for i in range(num, num+100)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
for i in range(0,1200,100):
try:
# 所有协程完成后,将全局DataFrame写入CSV文件
global_df.to_csv('list_600436.csv', mode='a', index=False, encoding='utf-8')
asyncio.run(main(i))
except:
print(f'error{i}--{i + 100}')
我用的是异步爬虫,好处是如果报错了不会直接终止程序, 而是跳过, 并且方便将被封闭的ip从ip池中移除.
得到url之后需要对上面的url dataframe进行清洗以删除重复的url:
import pandas as pd
df=pd.read_csv("list_600436.csv")
df_cleaned = df.drop_duplicates()
df_cleaned.to_csv('list_600436_washed.csv',encoding="utf-8-sig",index=False)
然后按照得到的url爬取评论.
import asyncio
import os.path
import shutil
import json
import aiohttp
import random
import requests
from lxml import etree
import pandas as pd
import re
from tqdm import tqdm
df=pd.read_csv("list_600436_washed.csv")
urls=['https://guba.eastmoney.com'+i for i in df['url'].tolist() if 'url' not in i]
headers = {
"Cookie": "qgqp_b_id=168d1a64114881cba2ba3d84da74e15f; st_si=59731079127804; st_pvi=90547074541767; st_sn=1; st_sp=2024-03-01%2016%3A29%3A21; st_inirUrl=; st_psi=20240301162920730-117001356556-9050495823; st_asi=20240301162920730-117001356556-9050495823-Web_so_srk-2",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
}
page_url = "https://guba.eastmoney.com/list,600436 _"
api_url = '...'
api_url2 = '...'
# 获取初始代理列表
proxy_list = requests.get(api_url).json().get('data').get('proxy_list')
# proxy_list=['171.42.101.205:16924', '221.229.212.170:40423', '183.164.57.189:22701', '171.41.151.91:20304', '114.99.10.224:21246']
print(proxy_list)
MAX_RETRIES=5
async def fetch(url):
global useless
retries=0
# 初始化一个空的DataFrame来存储链接
global_df = pd.DataFrame(columns=['url', 'title', 'content', 'time'])
pattern = re.compile(r',(\d+)\.')
text = "".join(re.findall(pattern, url))
while retries < MAX_RETRIES:
try:
await asyncio.sleep(0.01)
async with aiohttp.ClientSession() as session:
proxy = random.choice(proxy_list)
print(url)
async with session.get(url, proxy="http://" + proxy, headers=headers) as resp:
if resp.status==200:
content = await resp.read()
content_text = content.decode('utf-8')
tree = etree.HTML(content_text)
data = tree.xpath("/html/body/script[2]/text()")[0].replace('var post_article=', '')
article_data = json.loads(data)
title = article_data["post_title"]
content = re.sub(r'<a[^>]*>(.*?)</a>', r'\1', article_data["post_content"], flags=re.DOTALL)
time1 = article_data["post_publish_time"]
global_df.loc[len(global_df)] = [url,title,content,time1]
break
else:
print(resp.status)
except :
useless.append(proxy)
proxy_list.extend(requests.get(api_url2).json().get('data').get('proxy_list'))
print('更新ip...')
await asyncio.sleep(0.01)
retries += 1
global_df.to_csv(f'D:/comments_temp/{text}.csv',encoding='utf8',index=False)
async def main(num):
tasks = [fetch(urls[i]) for i in range(num, num+50)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
global useless
if os.path.exists('D:/comments_temp'):
shutil.rmtree('D:/comments_temp')
os.makedirs('D:/comments_temp')
for i in tqdm(range(0,len(urls),50)):
useless=[]
asyncio.run(main(i))
for i in list(set(useless)):
proxy_list.remove(i)
为了保证安全性, 所爬得的每一条股评都会保存在一个单独的csv文件中, 这是为了避免ip池耗尽后程序报错导致所有数据都丢失了 (ip池价格不菲). 所以当得到所有的股评之后, 还要将所得的评论整合.
整合评论:
import os
import pandas as pd
import csv
csv_files=[f"D:/comments_temp/{i}" for i in os.listdir("D:/comments_temp")]
''
# 读取所有csv文件并合并
df_list = [pd.read_csv(file) for file in csv_files]
df_merged = pd.concat(df_list, ignore_index=True)
df_cleaned = df_merged.drop_duplicates()
df_merged.to_csv("temp.csv",encoding='utf-8-sig',index=False)
file=open('temp.csv','r',encoding='utf8')
reader=csv.reader(file)
head=next(reader)
new_rows=[]
for row in reader:
num=str(row).count("'[]'")
if num>=3 or row[0]=="url":
pass
else:
new_rows.append(row)
file2=open("comments_clean.csv",'w',encoding="utf-8-sig",newline='')
writer=csv.writer(file2)
writer.writerow(head)
for row2 in new_rows:
writer.writerow(row2)
file.close()
file2.close()
最终删去爬取的评论中的非文本信息, 如额外的括号, 标识符等.
import numpy
import pandas as pd
import re
df=pd.read_csv("comments_clean.csv")
wash = pd.DataFrame(columns=["content"])
for i in df["content"]:
i=str(i)
i = re.sub(u"\\$.*?\\$|\\{.*?}|\\<.*?>", "", i)
i = i.replace(" ","")
wash.loc[len(wash)] = i
df["content"] = wash["content"]
df.to_csv('comments_final.csv',encoding='utf8',index=False)
接着用百度智能云深度学习模型分析情感, 见:Baidu Intelligence Cloud and Sentiment Analysis