dify 實現分析 - rag - 文件上傳後的處理

概述

前面的文章:已經說明了文檔上傳的總體步驟。當上傳一個或多個文檔後,dify 會啓動索引的構建任務來處理文檔內容,並構建索引。本文介紹文檔索引構建的實現邏輯。

document_indexing_task 函數

新建文檔的索引構建在函數document_indexing_task中實現,該函數的總體邏輯如下圖:

上傳文檔的處理在 document_indexing_task 函數中進行。該函數主要是記錄文檔的狀態,並啓動索引的構建任務。

  1. 任務定義

使用 Celery 的 @shared_task 裝飾器定義異步任務,指定任務隊列爲 "dataset",並以數據集 ID 和文檔 ID 列表作爲參數。代碼如下:

@shared_task(queue="dataset")
def document_indexing_task(dataset_id: str, document_ids: list):
  1. 數據集驗證

查詢並驗證數據集是否存在,如果數據集不存在,記錄日誌並返回。

dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
    logging.info(click.style("Dataset is not found: {}".format(dataset_id)fg="yellow"))
    return
  1. 租戶限額檢查

檢查租戶的功能限制:驗證批量上傳限制和向量空間限制,若超出限制,將所有文檔標記爲錯誤狀態。

features = FeatureService.get_features(dataset.tenant_id)
try:
    if features.billing.enabled:
        # 檢查批量上傳限制
        # 檢查向量空間限制
except Exception as e:
    # 處理錯誤:更新所有相關文檔的狀態爲錯誤
    for document_id in document_ids:
        document = db.session.query(Document).filter(...).first()
        if document:
            document.indexing_status = "error"
            document.error = str(e)
            document.stopped_at = datetime.datetime.now(...)
            db.session.add(document)
    db.session.commit()
    return
  1. 文檔預處理

遍歷所有文檔 ID:查詢每個文檔,更新文檔狀態爲 "parsing",和記錄處理開始時間。

for document_id in document_ids:
    logging.info(click.style("Start process document: {}".format(document_id)fg="green"))
    document = db.session.query(Document).filter(...).first()
    
    if document:
        # 更新文檔狀態爲解析中
        document.indexing_status = "parsing"
        document.processing_started_at = datetime.datetime.now(...)
        documents.append(document)
        db.session.add(document)
db.session.commit()
  1. 針對 document 構建索引

創建 IndexingRunner 實例,運行文檔索引處理,並記錄處理耗時和處理可能的暫停錯誤和其他異常。

try:
    indexing_runner = IndexingRunner()
    indexing_runner.run(documents)
    end_at = time.perf_counter()
    logging.info(click.style("Processed dataset: {} latency: {}".format(
        dataset_id, end_at - start_at)fg="green"))
except DocumentIsPausedError as ex:
    logging.info(click.style(str(ex)fg="yellow"))
except Exception:
    pass

索引構建:IndexingRunner.run

該函數的總體實現邏輯如下圖所示:

IndexingRunner

其中 run 函數的功能和實現步驟如下:

(1)遍歷傳入的 dataset_documents 列表,對每個文檔執行索引操作。

(2)從數據庫中獲取相應(通過數據集 id 來查詢)的數據集(Dataset),若不存在,停止處理拋出錯誤。

(3)獲取當前數據集和文檔的處理規則(Processing Rule)

(4)根據文檔類型(doc_form),選擇合適的索引處理器(IndexProcessor)。dify 提供了 4 中索引構建器,可以在 IndexType 類中查看這幾種索引構建器的名稱。

(5)使用 self._extract 函數根據文檔不同格式獲取文檔中的段落數據,並保存到 Document 對象中

(6)使用 self._transform 方法將提取的文本數據轉換爲內部使用的 Document 對象:把第(5)步獲取到的文檔數據進行進一步切分,切分成一個個的 chunk,每個 chunk 是一個 Document 對象。

(7)將提取和轉換後的文檔分段(chunk)信息保存到 DocumentSegment 表中。

(8)使用 self._load 根據文檔分段來構建索引,或文檔向量。

總結

通過以上分析已經知道了整個索引構建的全部流程。索引構建每個步驟的詳細實現邏輯在後面的文章中繼續分析。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/kgimAa77BKnAQD4XhIctvg