源碼剖析 - 公衆號採集閱讀器 Liuli

簡介


無意中發現 Liuli 這個項目,項目 Github:https://github.com/liuli-io/liuli

看了其文章,發現 Liuli 是 Python 實現的,便打算簡單看看其實現細節,老規矩,看項目,先將好奇點寫下來:

對,我就對這兩點感興趣,經過一番閱讀後,關於好奇 1,其實人家沒有實現漂亮的 PC 軟件界面,Liuli 只是採集,然後將內容推送過去,所以本文的重點,就是看一下它是怎麼採集公衆號文章的,此外在閱讀過程中,發現 LiuLi 還使用了簡單的方法來識別文章是否爲廣告文章,這點也挺有意思的,也記錄一下。

公衆號文章採集

Liuli 基於搜狗微信(https://weixin.sogou.com/)對公衆號文章進行採集,而且實現了 2 種方式:

我們可以通過相應的配置文件控制 Liuli 使用其中哪種方式來進行文章採集,其默認使用 ruia 的方式進行採集。

Liuli 將功能分爲多個模塊,然後通過調度器去調度不同的模塊,調度器啓動方法代碼如下:

# src/liuli_schedule.py

def start(ll_config_name: str = ""):
    """調度啓動函數

    Args:
        task_config (dict): 調度任務配置
    """
    if not ll_config_name:
        freeze_support()

        # 默認啓動 liuli_config 目錄下所有配置
        ll_config_name_list = []
        for each_file in os.listdir(Config.LL_CONFIG_DIR):
            if each_file.endswith("json"):
                # 加入啓動列表
                ll_config_name_list.append(each_file.replace(".json"""))
        # 進程池
        p = Pool(len(ll_config_name_list))
        for each_ll_config_name in ll_config_name_list:
            LOGGER.info(f"Task {each_ll_config_name} register successfully!")
            p.apply_async(run_liuli_schedule, args=(each_ll_config_name,))
        p.close()
        p.join()

    else:
        run_liuli_schedule(ll_config_name)

從代碼可知,調度器會啓動 Python 的進程池,然後向其中添加 run_liuli_schedule 異步任務,該異步任務中,會執行 run_liuli_task 方法,該方法纔是一次完整的任務流程,代碼如下:

def run_liuli_task(ll_config: dict):
    """執行調度任務

    Args:
        ll_config (dict): Liuli 任務配置
    """
    # 文章源, 用於基礎查詢條件
    doc_source: str = ll_config["doc_source"]
    basic_filter = {"basic_filter"{"doc_source": doc_source}}
    # 採集器配置
    collector_conf: dict = ll_config["collector"]
    # 處理器配置
    processor_conf: dict = ll_config["processor"]
    # 分發器配置
    sender_conf: dict = ll_config["sender"]
    sender_conf.update(basic_filter)
    # 備份器配置
    backup_conf: dict = ll_config["backup"]
    backup_conf.update(basic_filter)

    # 採集器執行
    LOGGER.info("採集器開始執行!")
    for collect_type, collect_config in collector_conf.items():
        collect_factory(collect_type, collect_config)
    LOGGER.info("採集器執行完畢!")
    # 採集器執行
    LOGGER.info("處理器(after_collect): 開始執行!")
    for each in processor_conf["after_collect"]:
        func_name = each.pop("func")
        # 注入查詢條件
        each.update(basic_filter)
        LOGGER.info(f"處理器(after_collect): {func_name} 正在執行...")
        processor_dict[func_name](**each)
    LOGGER.info("處理器(after_collect): 執行完畢!")
    # 分發器執行
    LOGGER.info("分發器開始執行!")
    send_doc(sender_conf)
    LOGGER.info("分發器執行完畢!")
    # 備份器執行
    LOGGER.info("備份器開始執行!")
    backup_doc(backup_conf)
    LOGGER.info("備份器執行完畢!")

從 run_liuli_task 方法可知,Liuli 一次任務需要執行:

關於 Liuli 的功能,可以閱讀作者本人的文章: 基於 Liuli 構建純淨的 RSS 公衆號信息流,這裏先只關注公衆號採集的邏輯。

因爲有 ruia 與 playwright 兩種不同方式實現的採集器,具體使用哪種,通過配置文件決定,然後通過 import_module 方法動態導入相應的模塊,然後運行模塊的 run 方法,從而實現公衆號文章的採集,相關代碼如下:

def collect_factory(collect_type: str, collect_config: dict) -> bool:
    """
    採集器工廠函數
    :param collect_type: 採集器類型
    :param collect_config: 採集器配置
    :return:
    """
    collect_status = False
    try:
        # import_module方法動態載入具體的採集模塊
        collect_module = import_module(f"src.collector.{collect_type}")
        collect_status = collect_module.run(collect_config)
    except ModuleNotFoundError:
        LOGGER.error(f"採集器類型不存在 {collect_type} - {collect_config}")
    except Exception as e:
        LOGGER.error(f"採集器執行出錯 {collect_type} - {collect_config} - {e}")
    return collect_status

playwright 採集模塊實現

playwright 是微軟出品的自動化庫,與 selenium 的作用類似,定位於網頁測試,但也被人用於網頁信息的獲取,可見即可得,使用門檻低,因爲要加載網頁信息,所以性能比較差,當然一些前端反爬的措施,playwright 也無法突破。

playwright 相比於 selenium,支持 python 的 async,性能有所提升(但還是比不了直接請求),這裏貼一下獲取某公衆號下最新文章的部分邏輯(完整代碼太長):

async def playwright_main(wechat_name: str):
    """利用 playwright 獲取公衆號元信息,輸出數據格式見上方
    Args:
        wechat_name ([str]): 公衆號名稱
    """
    wechat_data = {}
    try:
        async with async_playwright() as p:
            # browser = await p.chromium.launch(headless=False)
            browser = await p.chromium.launch()
            context = await browser.new_context(user_agent=Config.SPIDER_UA)
            page = await context.new_page()
            # 進行公衆號檢索
            await page.goto("https://weixin.sogou.com/")
            await page.wait_for_load_state()
            await page.click('input[]')
            await page.fill('input[]', wechat_name)
            await asyncio.sleep(1)
            await page.click("text=搜公衆號")
            await page.wait_for_load_state()

從上述代碼可知,playwright 用法與 selenium 很像,將用戶操作網站的流程自動化便可以獲取相應的數據了。

ruia 採集模塊實現

ruia 是輕量級的 Python 異步爬蟲框架,因爲比較輕量,我也將其代碼讀了一遍,作爲下篇文章的內容。

它的用法與 scrapy 有點像,需要定義繼承於 ruia.Spider 的子類,然後調用 start 方法實現對目標網站的請求,然後 ruia 會自動調用 parse 方法實現對網頁內容的解析,來看一下具體的代碼,首先是入口邏輯:

def run(collect_config: dict):
    """微信公衆號文章抓取爬蟲

    Args:
        collect_config (dict, optional): 採集器配置
    """
    s_nums = 0
    wechat_list = collect_config["wechat_list"]
    delta_time = collect_config.get("delta_time", 5)
    for wechat_name in wechat_list:
        SGWechatSpider.wechat_name = wechat_name
        SGWechatSpider.request_config = {
            "RETRIES": 3,
            "DELAY": delta_time,
            "TIMEOUT": 20,
        }
        sg_url = f"https://weixin.sogou.com/weixin?type=1&query={wechat_name}&ie=utf8&s_from=input&_sug_=n&_sug_type_="
        SGWechatSpider.start_urls = [sg_url]
        try:
            # 啓動爬蟲
            SGWechatSpider.start(middleware=ua_middleware)
            s_nums += 1
        except Exception as e:
            err_msg = f"😿 公衆號->{wechat_name} 文章更新失敗! 錯誤信息: {e}"
            LOGGER.error(err_msg)

    msg = f"🤗 微信公衆號文章更新完畢({s_nums}/{len(wechat_list)})!"
    LOGGER.info(msg)

上述代碼中,通過 SGWechatSpider.start (middleware=ua_middleware) 啓動了爬蟲,它會自動請求 start_urls 的 url,然後回調 parse 方法,parse 方法代碼如下:

    async def parse(self, response: Response):
        """解析公衆號原始鏈接數據"""
        html = await response.text()
        item_list = []
        async for item in SGWechatItem.get_items(html=html):
            if item.wechat_name == self.wechat_name:
                item_list.append(item)
                yield self.request(
                    url=item.latest_href,
                    metadata=item.results,
                    # 下一個回調方法
                    callback=self.parse_real_wechat_url,
                )
                break

parse 方法中,會通過 self.request 請求新的 url,然後回調 self.parse_real_wechat_url 方法,一切都與 scrapy 如此相似。

至此,採集模塊的閱讀就結束了(代碼中還涉及一些簡單的數據清洗,本文就不討論了),沒有特別複雜的部分,從代碼上看,也沒有發送作者做反爬邏輯的處理,搜狗微信沒有反爬?

廣告文章識別

接着看一下廣告文章識別,Liuli 對於廣告文章,還是會採集的,採集後,在文章處理模塊,會將廣告文章標註出來,先理一下廣告文章標註的入口邏輯,回到 liuli_schedule.py 的 run_lili_task 方法,關注到 process(文章處理模塊)的邏輯,代碼如下:

    LOGGER.info("處理器(after_collect): 開始執行!")
    for each in processor_conf["after_collect"]:
        func_name = each.pop("func")
        # 注入查詢條件
        each.update(basic_filter)
        LOGGER.info(f"處理器(after_collect): {func_name} 正在執行...")
        processor_dict[func_name](**each)
    LOGGER.info("處理器(after_collect): 執行完畢!")

從上述代碼可知,處理器的主要邏輯是 processor_dict 字典中的方法,該字典的定義的路徑爲 src/processor/__init__.py,代碼如下:

from .rss_utils import to_rss
from .text_utils import (
    ad_marker,
    extract_core_html,
    extract_keyword_list,
    html_to_text_h2t,
    str_replace,
)

processor_dict = {
    "to_rss": to_rss,
    "ad_marker": ad_marker,
    "str_replace": str_replace,
}

其中 ad_marker 方法便是識別文章是否爲廣告文章的方法,其實寫的有點繞,核心邏輯就是計算當前文章與採集到的廣告文章詞頻構建向量的餘弦值,判斷餘弦值大小來判斷是否爲廣告文章,簡單看一下相關的邏輯。

ad_marker 方法中會調用 model_predict_factory 方法,將當前文章的標題、文章內容以及分類的 cos_value 傳入,相關代碼如下(清理了代碼,只展示了需要部分):

def ad_marker(
    cos_value: float = 0.6,
    is_force=False,
    basic_filter={},
    **kwargs,
):
    # 基於餘弦相似度
    cos_model_resp = model_predict_factory(
        model_,
        model_path="",
        input_dict={"text": doc_name + doc_keywords, "cos_value": cos_value},
        # input_dict={"text": doc_name, "cos_value": Config.COS_VALUE},
    ).to_dict()

cos_value 爲 0.6,即如果計算出當前文章與廣告文章餘弦值大於等於 0.6,則認爲當前文章爲廣告文章,其最終預測邏輯在 classifier/model_base/cos_model_loader.py 的 predict 方法中,代碼如下:

def predict(self, text: str, cos_value: float = 0.8) -> dict:
    """
    對文本相似度進行預測
    :param text: 文本
    :param cos_value: 閾值 默認是0.9
    :return:
    """
    max_pro, result = 0.0, 0
    for each in self.train_data:
        # 餘弦值具體的運算邏輯
        cos = CosineSimilarity(self.process_text(text), each)
        res_dict = cos.calculate()
        value = res_dict["value"]
        # 大於等於cos_value,就返回1,則表示當前的文章是廣告文章
        result = 1 if value >= cos_value else 0
        max_pro = value if value > max_pro else max_pro
        if result == 1:
            break

    return {"result": result, "value": max_pro}

餘弦值具體的運算邏輯在 CosineSimilarity 的 calculate 方法中,都是數學相關的代碼,就不看了,其核心是希望判斷當前文章與廣告文章的相似度,類似的還可以通過 TFIDF、文本聚類等算法來做,相關的庫,幾行代碼就可以搞定(所以我感覺這裏寫繞了)。

其餘可參考邏輯

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/tV3z88eKAcYsb1oG1thbFw