ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;"> ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;">本指南面向希望为 RAGFlow 扩展数据源能力的社区开发者,旨在以专业、可复用的流程说明如何实现并接入新的 connector。RAGFlow 的 connector 框架深受Onyx 开源项目启发,特此致谢。ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;">在实际使用中,除了从本地文件系统导入文件,RAGFlow还需要从大量异构系统中获取数据。ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;"> ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;">为此,引入了统一的 datasource / connector组件:每一种外部数据源通过一个 connector完成“认证、拉取文档、增量更新以及按 checkpoint续传”等一系列动作,从而让上层检索与问答逻辑完全解耦于底层数据源。ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;"> ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;">围绕这一目标,RAGFlow 暴露了四个主要接口,覆盖了从简单到复杂的大部分场景:ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;font-size: 15px;line-height: 30px;padding-left: 20px;" class="list-paddingleft-1">LoadConnector:定义全量同步(full sync,load_from_state)的行为,用于重建或恢复完整数据视图。PollConnector:定义增量同步(incremental sync,poll_source)的行为,用于在已有数据基础上只拉取变化。CheckpointedConnector:在需要分页或“断点续传”时,通过load_from_checkpoint+ConnectorCheckpoint显式管理游标与进度。CheckpointedConnectorWithPermSync:在需要同时考虑内容与权限(permission)时,在 checkpoint 流中附带权限与失败信息,支持更精细的同步与重试策略。ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;"> ingFang SC", system-ui, -apple-system, BlinkMacSystemFont, "Helvetica Neue", "Hiragino Sans GB", "Microsoft YaHei UI", "Microsoft YaHei", Arial, sans-serif;color: rgb(31, 35, 41);margin: 0px 0px 4px;word-break: break-all;min-height: 20px;">对于“结构简单”的数据源(例如对象存储、单一 API 拉取),通常只需实现LoadConnector和PollConnector即可完成接入;
对于需要分页、断点续扫或权限同步的“复杂数据源”(例如 Confluence,Jira,Google Drive),可以进一步实现CheckpointedConnector或CheckpointedConnectorWithPermSync。
只要按照本文说明实现并接好这四类接口,就可以将你的数据源无缝接入 RAGFlow。
本文共有五个部分:体系概览、核心抽象接口、实现流程、接入 SyncBase 的示例解析,以及一个交付检查清单。
- 1.目录结构:各类数据源代码位于common/data_source/,复杂源(如 Jira)会拆分为子包。公共工具位于common/data_source/utils.py。
- 2.调度入口:rag/svr/sync_data_source.py通过 Trio 协程调度同步任务,并根据FileSource选择具体实现。
- 3.常量定义:common/constants.py中的FileSource与common/data_source/config.py中的DocumentSource分别决定任务标识和文档来源标签。新增数据源时必须同步更新。
整体结构可以抽象为三层: - Sync 层:rag/svr/sync_data_source.py中的SyncBase及其子类(例如S3、Jira),负责根据任务配置决定何时调用load_from_state/poll_source/load_from_checkpoint,并将批次写入知识库。
- Connector 实现层:common/data_source/下具体的 connector 类(例如BlobStorageConnector、JiraConnector),实现LoadConnector、PollConnector和/或CheckpointedConnector(WithPermSync)接口,专注于“如何从外部系统取数并组装为 Document”。
- 基础服务层:SyncLogsService与KnowledgebaseService,分别负责同步日志统计与文档入库。
从贡献者视角,可以简单地理解为: - 在 connector 层实现好“如何产生Document”(含凭证、分页、checkpoint 等细节);
- 在 Sync 层实现好“何时调用 connector、如何把批次交给基础服务”,并按照第 3 节的流程将新 connector 挂接进整个体系。
SyncBase是调度流程的核心。所有数据源的执行逻辑都会在__call__中被统一处理。一般不需要被修改。
SyncBase 负责统一的批量写入、日志与 checkpoint 更新,而_generate()由各数据源实现,负责返回Iterable[list[Document]]。 classSyncBase: SOURCE_NAME:str=None
asyncdef__call__(self, task:dict): ... asyncdef_generate(self, task:dict): raiseNotImplementedError
文档模型 所有 connector 必须产出Document。doc_updated_at必须是 UTC 时间,以保证增量同步精度。
classDocument(BaseModel): id:str source:str semantic_identifier:str extension:str blob:bytes doc_updated_at: datetime size_bytes:int
- id:在同一数据源内必须唯一,并且在多次同步中尽量保持不变。常见形式是"source_type:business_key"。
- semantic_identifier:尽量包含标题、作者或位置等关键信息,方便检索。
同步接口:全量同步(Load)与增量同步(Poll)
- 全量同步(Load,load_from_state):
- 用于重建当前 connector 下的“完整视图”,通常在任务reindex == "1"或首次同步时触发。实现上,一般会从“时间起点”(如 1970-01-01)扫到当前时间,将符合条件的所有文档分批yield list[Document]。在这一模式下,connector 不需要关心上一次同步到哪里,只需要确保能完整遍历外部系统中应被索引的对象。
classLoadConnector(ABC): @abstractmethod defload_credentials(self, credentials ict[str,Any]) ->Dict[str,Any] |None: ...
@abstractmethod defload_from_state(self) -> Generator[list[Document],None,None]: """load all documents up to now""" ...
@abstractmethod defvalidate_connector_settings(self) ->None: ...
- 增量同步(Poll,poll_source):
- 用于在已有数据基础上“只拉变化”,避免重复处理历史内容。系统会为每次任务提供一个时间窗口(start, end](例如从上一次成功同步的时间到现在),connector 应当只返回在该窗口内新增或更新过的文档。实现时,应严格遵守(start, end]条件,并依赖Document.doc_updated_at的准确性,防止遗漏或重复。
classPollConnector(ABC): @abstractmethod defpoll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Generator[list[Document],None,None]: """load documents from start to end""" ...
下面的代码展示了 S3 对应的具体实现: classS3(SyncBase): SOURCE_NAME:str= FileSource.S3
asyncdef_generate(self, task:dict): self.connector = BlobStorageConnector( bucket_type=self.conf.get("bucket_type","s3"), bucket_name=self.conf["bucket_name"], prefix=self.conf.get("prefix","") ) self.connector.load_credentials(self.conf["credentials"]) document_batch_generator = ( self.connector.load_from_state() iftask["reindex"] =="1"ornottask["poll_range_start"] elseself.connector.poll_source( task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp() ) ) returndocument_batch_generator
对应的 connector 的实现: classBlobStorageConnector(LoadConnector, PollConnector): defload_from_state(self) -> GenerateDocumentsOutput: returnself._yield_blob_objects( start=datetime(1970,1,1, tzinfo=timezone.utc), end=datetime.now(timezone.utc), )
defpoll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput: start_datetime = datetime.fromtimestamp(start, tz=timezone.utc) end_datetime = datetime.fromtimestamp(end, tz=timezone.utc) forbatchinself._yield_blob_objects(start_datetime, end_datetime): yieldbatch
defvalidate_connector_settings(self) ->None: ...
可以看到: - 调度层(S3._generate)只关心“全量 vs 增量”的切换。
- connector 层通过统一的_yield_blob_objects分别实现“从起点到当前”的一次性加载与“窗口内”的滚动拉取。
Checkpoint 支持对于只能分页读取或需要“断点续扫”的系统,推荐使用 checkpoint 抽象来管理游标状态。这里的 checkpoint 可以理解为“本轮同步结束时的游标快照”,通常包含: - 当前分页位置或 offset(例如某一页的起始索引)
- 已处理 ID 集合或光标(cursor),用来避免重复拉取
- 是否还有剩余数据的标记(如has_more)
典型的使用场景包括: - 外部 API 只支持分页访问,且每页大小有限(如 Jira、Confluence、Google Drive 等)
- 同步过程可能被中断,需要从上一次完成的位置继续,而不是从头重跑
- 希望按较小的批次反复调用 connector,每次只处理一部分数据
在这类场景下,每次调用load_from_checkpoint都会: - 1.基于传入的 checkpoint(上一轮的游标快照)决定本轮要读取的数据范围
- 2.逐个返回Document或ConnectorFailure
- 3.在生成器结束时返回一个“新的 checkpoint”,供下一轮调用使用
当前在 RAGFlow 中主要有两种典型用法,也对应了“简单 connector”与“复杂 connector”的典型划分:
相对简单的CheckpointedConnector
以 Confluence 为例对于“相对简单”的 connector(只需要按时间和分页遍历内容,不关心权限,也不需要把复杂失败信息编码进 checkpoint),通常实现CheckpointedConnector或甚至只实现LoadConnector/PollConnector即可,例如 Confluence 内容拉取或 S3 这类存储源。这类 connector 的关注点是“把所有需要索引的对象可靠地遍历一遍”。
接口定义如下: classCheckpointedConnector(BaseConnector[CT]): @abc.abstractmethod defload_from_checkpoint( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch, checkpoint: CT, ) -> CheckpointOutput[CT]: ...
@abc.abstractmethod defbuild_dummy_checkpoint(self) -> CT: ...
@abc.abstractmethod defvalidate_checkpoint_json(self, checkpoint_json:str) -> CT: ...
Confluence 的实现只关注“遍历内容”,不在 checkpoint 中携带权限信息: classConfluenceCheckpoint(ConnectorCheckpoint): next_page_url:str|None
classConfluenceConnector( CheckpointedConnector[ConfluenceCheckpoint], SlimConnector, SlimConnectorWithPermSync, CredentialsConnector, ): defload_from_checkpoint( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch, checkpoint: ConfluenceCheckpoint, ) -> CheckpointOutput[ConfluenceCheckpoint]: end += ONE_DAY # handle time zone weirdness try: returnself._fetch_document_batches(checkpoint, start, end) exceptExceptionase: ...
defbuild_dummy_checkpoint(self) -> ConfluenceCheckpoint: returnConfluenceCheckpoint(has_more=True, next_page_url=None)
defvalidate_checkpoint_json(self, checkpoint_json:str) -> ConfluenceCheckpoint: returnConfluenceCheckpoint.model_validate_json(checkpoint_json)
配合工具函数可以一次性加载或增量加载文档: fordocinload_all_docs_from_checkpoint_connector( connector=confluence_connector, start=start, end=end, ): print(doc)
相对复杂的CheckpointedConnectorWithPermSync
以 Jira 为例对于“相对复杂”的 connector(需要结合权限、失败记录、外部系统特有游标等信息),更推荐实现CheckpointedConnectorWithPermSync,并在 checkpoint 中显式记录分页游标、剩余状态等,例如 Jira,Google Drive,Slack,Teams。这类 connector 的关注点除了内容本身,还包括“谁能看到什么”和“哪些对象在某一轮中失败了,需要后续重试”。
接口如下: classCheckpointedConnectorWithPermSync(ABC): @abstractmethod defload_from_checkpoint( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch, checkpoint: ConnectorCheckpoint, ) -> Generator[Document | ConnectorFailure,None, ConnectorCheckpoint]: ...
@abstractmethod defload_from_checkpoint_with_perm_sync( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch, checkpoint: ConnectorCheckpoint, ) -> Generator[Document | ConnectorFailure,None, ConnectorCheckpoint]: ...
@abstractmethod defbuild_dummy_checkpoint(self) -> ConnectorCheckpoint: ...
@abstractmethod defvalidate_checkpoint_json(self, checkpoint_json:str) -> ConnectorCheckpoint: ...
Jira 同时需要内容与权限/元数据同步,因此实现了带权限的 checkpoint 接口,并使用专门的 checkpoint 类型来跟踪分页状态: classJiraCheckpoint(ConnectorCheckpoint): """Checkpoint that tracks which slice of the current JQL result set was emitted."""
start_at:int=0 cursor:str|None=None ids_done:bool=False all_issue_ids:list[list[str]]
classJiraConnector(CheckpointedConnectorWithPermSync): ...
在调度侧,rag/svr/sync_data_source.py显式编写 checkpoint 循环,利用CheckpointOutputWrapper统一处理成功与失败: defdocument_batches(): checkpoint =self.connector.build_dummy_checkpoint() pending_docs = [] ... whilecheckpoint.has_more: wrapper = CheckpointOutputWrapper() generator = wrapper( self.connector.load_from_checkpoint( start_time, end_time, checkpoint, ) ) fordocument, failure, next_checkpointingenerator: iffailureisnotNone: continue ifdocumentisnotNone: pending_docs.append(document) iflen(pending_docs) >= batch_size: yieldpending_docs pending_docs = [] ifnext_checkpointisnotNone: checkpoint = next_checkpoint ... ifpending_docs: yieldpending_docs
对贡献者的建议: - 仅内容同步时,可参考 Confluence,实现CheckpointedConnector并复用load_all_docs_from_checkpoint_connector。
参考https://github.com/infiniflow/ragflow/issues/11376
- 内容与权限同步或复杂错误策略时,可参考 Jira,实现CheckpointedConnectorWithPermSync并在sync_data_source.py中编写借助CheckpointOutputWrapper显式编写 checkpoint 循环。
参考https://github.com/infiniflow/ragflow/blob/bd4bc57009fe2990b3be1000564a4d5559477cfc/rag/svr/sync_data_source.py#L351
最小交付要求如下: 通过以上六步即可得到一个结构完整、可被调度器识别的最小实现。
以下示例展示了如何在SyncBase._generate中接入不同类型的 connector,并从调度逻辑到批量产出Document的完整链路,可作为实现接入步骤的参考。
S3 Blob Storage 示例SyncBase 中的调度逻辑 classS3(SyncBase): SOURCE_NAME:str= FileSource.S3
asyncdef_generate(self, task:dict): self.connector = BlobStorageConnector( bucket_type=self.conf.get("bucket_type","s3"), bucket_name=self.conf["bucket_name"], prefix=self.conf.get("prefix","") ) self.connector.load_credentials(self.conf["credentials"]) document_batch_generator = ( self.connector.load_from_state() iftask["reindex"] =="1"ornottask["poll_range_start"] elseself.connector.poll_source( task["poll_range_start"].timestamp(), datetime.now(timezone.utc).timestamp() ) ) begin_info ="totally"iftask["reindex"] =="1"ornottask["poll_range_start"]elsef"from{task['poll_range_start']}" logging.info( f"Connect to S3:{self.conf['bucket_name']}/" f"{self.conf.get('prefix','')}{begin_info}" ) returndocument_batch_generator
Connector 逻辑 classBlobStorageConnector(LoadConnector, PollConnector): def__init__(self, bucket_type:str, bucket_name:str, prefix:str="", batch_size:int= INDEX_BATCH_SIZE, european_residency:bool=False) ->None: self.bucket_type: BlobType = BlobType(bucket_type) self.bucket_name = bucket_name.strip() self.prefix = prefixifnotprefixorprefix.endswith("/")elseprefix +"/" self.batch_size = batch_size self.s3_client:Optional[Any] =None
defload_credentials(self, credentials:dict[str,Any]) ->dict[str,Any] |None: ifself.bucket_type == BlobType.S3: authentication_method = credentials.get("authentication_method","access_key") ifauthentication_method =="access_key": ifnotall(credentials.get(key)forkeyin["aws_access_key_id","aws_secret_access_key"]): raiseConnectorMissingCredentialError("Amazon S3") elifauthentication_method =="iam_role": ifnotcredentials.get("aws_role_arn"): raiseConnectorMissingCredentialError("Amazon S3 IAM role ARN is required") ... self.s3_client = create_s3_client(self.bucket_type, credentials,self.european_residency) returnNone
def_yield_blob_objects(self, start: datetime, end: datetime) -> GenerateDocumentsOutput: paginator =self.s3_client.get_paginator("list_objects_v2") pages = paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix) batch:list[Document] = [] forpageinpages: forobjinpage.get("Contents", []): ... file_name = os.path.basename(obj["Key"]) blob = download_object(self.s3_client,self.bucket_name, obj["Key"],self.size_threshold) ifblobisNone: continue batch.append(Document( id=f"{self.bucket_type}:{self.bucket_name}:{obj['Key']}", blob=blob, source=DocumentSource(self.bucket_type.value), semantic_identifier=file_name, extension=get_file_ext(file_name), doc_updated_at=last_modified, size_bytes=extract_size_bytes(obj)or0 )) iflen(batch) ==self.batch_size: yieldbatch ifbatch: yieldbatch
实现要点 - 1.SyncBase 层只负责根据任务配置在“全量/增量”之间切换,实际抓取与分页逻辑全部封装在 connector 内。
- 2.Document.id使用bucket_type:bucket:key组合,天然避免重复。
- 3._yield_blob_objects同时服务于load_from_state与poll_source,可确保两种模式行为一致。
- 1.FileSource与DocumentSource已添加对应条目。
- 2.load_credentials()校验了所有必需字段,如涉及 OAuth 亦支持凭证刷新与持久化。
- 3.load_from_state()与poll_source()在适用时共享批处理实现,或在行为不完全一致时有清晰的设计理由和文档说明。
- 4.生成的Document填写了id、semantic_identifier、extension、doc_updated_at(UTC)及size_bytes。
- 5.SyncBase子类根据task["reindex"]正确切换同步模式,并输出明确日志。
- 6.涉及 checkpoint/分页的实现具备终止条件或迭代上限,避免无限循环。
- 7.已完成至少一次全量同步与一次增量同步的本地验证,并在 PR 中注明参考 Onyx 的实现要点。
完成上述检查后,即可提交 PR。期待你的贡献帮助 RAGFlow 覆盖更多企业数据源,与社区共同构建稳健的知识底座。 |