dify 實現分析 - rag - 文件上傳後的處理
概述
前面的文章:已經說明了文檔上傳的總體步驟。當上傳一個或多個文檔後,dify 會啓動索引的構建任務來處理文檔內容,並構建索引。本文介紹文檔索引構建的實現邏輯。
document_indexing_task 函數
新建文檔的索引構建在函數document_indexing_task
中實現,該函數的總體邏輯如下圖:
上傳文檔的處理在 document_indexing_task 函數中進行。該函數主要是記錄文檔的狀態,並啓動索引的構建任務。
- 任務定義
使用 Celery 的 @shared_task 裝飾器定義異步任務,指定任務隊列爲 "dataset",並以數據集 ID 和文檔 ID 列表作爲參數。代碼如下:
@shared_task(queue="dataset")
def document_indexing_task(dataset_id: str, document_ids: list):
- 數據集驗證
查詢並驗證數據集是否存在,如果數據集不存在,記錄日誌並返回。
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
logging.info(click.style("Dataset is not found: {}".format(dataset_id), fg="yellow"))
return
- 租戶限額檢查
檢查租戶的功能限制:驗證批量上傳限制和向量空間限制,若超出限制,將所有文檔標記爲錯誤狀態。
features = FeatureService.get_features(dataset.tenant_id)
try:
if features.billing.enabled:
# 檢查批量上傳限制
# 檢查向量空間限制
except Exception as e:
# 處理錯誤:更新所有相關文檔的狀態爲錯誤
for document_id in document_ids:
document = db.session.query(Document).filter(...).first()
if document:
document.indexing_status = "error"
document.error = str(e)
document.stopped_at = datetime.datetime.now(...)
db.session.add(document)
db.session.commit()
return
- 文檔預處理
遍歷所有文檔 ID:查詢每個文檔,更新文檔狀態爲 "parsing",和記錄處理開始時間。
for document_id in document_ids:
logging.info(click.style("Start process document: {}".format(document_id), fg="green"))
document = db.session.query(Document).filter(...).first()
if document:
# 更新文檔狀態爲解析中
document.indexing_status = "parsing"
document.processing_started_at = datetime.datetime.now(...)
documents.append(document)
db.session.add(document)
db.session.commit()
- 針對 document 構建索引
創建 IndexingRunner 實例,運行文檔索引處理,並記錄處理耗時和處理可能的暫停錯誤和其他異常。
try:
indexing_runner = IndexingRunner()
indexing_runner.run(documents)
end_at = time.perf_counter()
logging.info(click.style("Processed dataset: {} latency: {}".format(
dataset_id, end_at - start_at), fg="green"))
except DocumentIsPausedError as ex:
logging.info(click.style(str(ex), fg="yellow"))
except Exception:
pass
索引構建:IndexingRunner.run
該函數的總體實現邏輯如下圖所示:
IndexingRunner
其中 run 函數的功能和實現步驟如下:
(1)遍歷傳入的 dataset_documents
列表,對每個文檔執行索引操作。
(2)從數據庫中獲取相應(通過數據集 id 來查詢)的數據集(Dataset),若不存在,停止處理拋出錯誤。
(3)獲取當前數據集和文檔的處理規則(Processing Rule)
(4)根據文檔類型(doc_form),選擇合適的索引處理器(IndexProcessor)。dify 提供了 4 中索引構建器,可以在 IndexType 類中查看這幾種索引構建器的名稱。
(5)使用 self._extract 函數根據文檔不同格式獲取文檔中的段落數據,並保存到 Document 對象中
(6)使用 self._transform
方法將提取的文本數據轉換爲內部使用的 Document
對象:把第(5)步獲取到的文檔數據進行進一步切分,切分成一個個的 chunk,每個 chunk 是一個 Document 對象。
(7)將提取和轉換後的文檔分段(chunk)信息保存到 DocumentSegment 表中。
(8)使用 self._load
根據文檔分段來構建索引,或文檔向量。
總結
通過以上分析已經知道了整個索引構建的全部流程。索引構建每個步驟的詳細實現邏輯在後面的文章中繼續分析。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/kgimAa77BKnAQD4XhIctvg