Web Crawler for EastMoney Website

我用了五个文件来实现爬取和即时数据保存. 经高人指点,代码非原创.

首先要得到东方财富网的帖子url. 经过分析可知在每一个股吧的初始页面都是诸如https://guba.eastmoney.com/list,code_k.html的结构, list表示初始页面, code表示股票代码,_k表示评论所在页数.

而帖子的url则如:https://guba.eastmoney.com/news,code,code_of_comment.html的结构, code_of_comment代表了每个帖子自己在东方财富网数据库中的id. 所以我们需要在每一页初始页面上获得当前页的帖子id, 并记录下来以检索每一个评论.

所以按照初始页面的url爬取每一个list下帖子的url:

import asyncio
import aiohttp
import random
import requests
from lxml import etree
import pandas as pd

headers = {
    "Cookie": "qgqp_b_id=168d1a64114881cba2ba3d84da74e15f; st_si=59731079127804; st_pvi=90547074541767; st_sn=1; st_sp=2024-03-01%2016%3A29%3A21; st_inirUrl=; st_psi=20240301162920730-117001356556-9050495823; st_asi=20240301162920730-117001356556-9050495823-Web_so_srk-2",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
}
page_url = "https://guba.eastmoney.com/list,600436,f_"

api_url = '...' #使用快代理生成api链接
api_url2 = '...'

# 获取初始代理列表
proxy_list = requests.get(api_url).json().get('data').get('proxy_list')
print(proxy_list)

# 初始化一个空的DataFrame来存储链接
global_df = pd.DataFrame(columns=['url'])

async def fetch(url):
    code='600436'
    try:
        async with aiohttp.ClientSession() as session:
            proxy = random.choice(proxy_list)
            print(url)
            await asyncio.sleep(0.01)
            async with session.get(url, proxy="http://" + proxy, headers=headers) as resp:
                content = await resp.read()
                content_text = content.decode('utf-8')
                tree = etree.HTML(content_text)
                href_list = [i for i in tree.xpath('//div[@class="title"]/a/@href') if code in i]

                # 将提取的链接添加到全局DataFrame
                for href in href_list:
                    global_df.loc[len(global_df)] = [href]

    except aiohttp.ClientConnectorError:

        try:
            proxy_list.remove(proxy)
            proxy_list.extend(requests.get(api_url2).json().get('data').get('proxy_list'))
            async with aiohttp.ClientSession() as session:
                proxy = random.choice(proxy_list)
                print(url)
                async with session.get(url, proxy="http://" + proxy, headers=headers) as resp:
                    content = await resp.read()
                    content_text = content.decode('utf-8')
                    tree = etree.HTML(content_text)
                    href_list = [i for i in tree.xpath('//div[@class="title"]/a/@href') if code in i]

                    # 将提取的链接添加到全局DataFrame
                    for href in href_list:
                        global_df.loc[len(global_df)] = [href]
        except:
            pass

async def main(num):
    tasks = [fetch(page_url+str(i)+".html") for i in range(num, num+100)]
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    for i in range(0,1200,100):
        try:

            # 所有协程完成后,将全局DataFrame写入CSV文件
            global_df.to_csv('list_600436.csv', mode='a', index=False, encoding='utf-8')
            asyncio.run(main(i))

        except:
            print(f'error{i}--{i + 100}')

我用的是异步爬虫,好处是如果报错了不会直接终止程序, 而是跳过, 并且方便将被封闭的ip从ip池中移除.


得到url之后需要对上面的url dataframe进行清洗以删除重复的url:

import pandas as pd
df=pd.read_csv("list_600436.csv")
df_cleaned = df.drop_duplicates()
df_cleaned.to_csv('list_600436_washed.csv',encoding="utf-8-sig",index=False)

然后按照得到的url爬取评论.


import asyncio
import os.path
import shutil
import json
import aiohttp
import random
import requests
from lxml import etree
import pandas as pd
import re
from tqdm import tqdm
df=pd.read_csv("list_600436_washed.csv")
urls=['https://guba.eastmoney.com'+i for i in df['url'].tolist() if 'url' not in i]
headers = {
    "Cookie": "qgqp_b_id=168d1a64114881cba2ba3d84da74e15f; st_si=59731079127804; st_pvi=90547074541767; st_sn=1; st_sp=2024-03-01%2016%3A29%3A21; st_inirUrl=; st_psi=20240301162920730-117001356556-9050495823; st_asi=20240301162920730-117001356556-9050495823-Web_so_srk-2",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
}
page_url = "https://guba.eastmoney.com/list,600436 _"

api_url = '...'
api_url2 = '...'

# 获取初始代理列表
proxy_list = requests.get(api_url).json().get('data').get('proxy_list')
# proxy_list=['171.42.101.205:16924', '221.229.212.170:40423', '183.164.57.189:22701', '171.41.151.91:20304', '114.99.10.224:21246']
print(proxy_list)
MAX_RETRIES=5

async def fetch(url):
    global useless
    retries=0
    # 初始化一个空的DataFrame来存储链接
    global_df = pd.DataFrame(columns=['url', 'title', 'content', 'time'])
    pattern = re.compile(r',(\d+)\.')
    text = "".join(re.findall(pattern, url))
    while retries < MAX_RETRIES:
        try:
            await asyncio.sleep(0.01)
            async with aiohttp.ClientSession() as session:
                proxy = random.choice(proxy_list)
                print(url)
                async with session.get(url, proxy="http://" + proxy, headers=headers) as resp:
                    if resp.status==200:
                        content = await resp.read()
                        content_text = content.decode('utf-8')
                        tree = etree.HTML(content_text)
                        data = tree.xpath("/html/body/script[2]/text()")[0].replace('var post_article=', '')
                        article_data = json.loads(data)
                        title = article_data["post_title"]
                        content = re.sub(r'<a[^>]*>(.*?)</a>', r'\1', article_data["post_content"], flags=re.DOTALL)
                        time1 = article_data["post_publish_time"]
                        global_df.loc[len(global_df)] = [url,title,content,time1]
                        break
                    else:
                        print(resp.status)
        except :

            useless.append(proxy)
            proxy_list.extend(requests.get(api_url2).json().get('data').get('proxy_list'))
            print('更新ip...')
            await asyncio.sleep(0.01)
        retries += 1

    global_df.to_csv(f'D:/comments_temp/{text}.csv',encoding='utf8',index=False)

async def main(num):
    tasks = [fetch(urls[i]) for i in range(num, num+50)]
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    global useless
    if os.path.exists('D:/comments_temp'):
        shutil.rmtree('D:/comments_temp')
    os.makedirs('D:/comments_temp')

    for i in tqdm(range(0,len(urls),50)):
        useless=[]

        asyncio.run(main(i))

        for i in list(set(useless)):
            proxy_list.remove(i)

为了保证安全性, 所爬得的每一条股评都会保存在一个单独的csv文件中, 这是为了避免ip池耗尽后程序报错导致所有数据都丢失了 (ip池价格不菲). 所以当得到所有的股评之后, 还要将所得的评论整合.


整合评论:

import os
import pandas as pd
import csv
csv_files=[f"D:/comments_temp/{i}" for i in os.listdir("D:/comments_temp")]

''

# 读取所有csv文件并合并

df_list = [pd.read_csv(file) for file in csv_files]
df_merged = pd.concat(df_list, ignore_index=True)
df_cleaned = df_merged.drop_duplicates()
df_merged.to_csv("temp.csv",encoding='utf-8-sig',index=False)
file=open('temp.csv','r',encoding='utf8')
reader=csv.reader(file)
head=next(reader)
new_rows=[]
for row in reader:

    num=str(row).count("'[]'")
    if num>=3 or row[0]=="url":
        pass
    else:
        new_rows.append(row)

file2=open("comments_clean.csv",'w',encoding="utf-8-sig",newline='')
writer=csv.writer(file2)
writer.writerow(head)
for row2 in new_rows:
    writer.writerow(row2)
file.close()
file2.close()

最终删去爬取的评论中的非文本信息, 如额外的括号, 标识符等.

import numpy
import pandas as pd
import re

df=pd.read_csv("comments_clean.csv")
wash = pd.DataFrame(columns=["content"])
for i in df["content"]:
    i=str(i)
    i = re.sub(u"\\$.*?\\$|\\{.*?}|\\<.*?>", "", i)
    i = i.replace(" ","")
    wash.loc[len(wash)] = i

df["content"] = wash["content"]
df.to_csv('comments_final.csv',encoding='utf8',index=False)

接着用百度智能云深度学习模型分析情感, 见:Baidu Intelligence Cloud and Sentiment Analysis

上一篇
下一篇