123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- import { v5 as uuidv5 } from "uuid";
- import { UUIDV5_NAMESPACE } from "./record_manager.js";
- import { insecureHash } from "../utils/hash.js";
- import { Document } from "../documents/document.js";
- /**
- * HashedDocument is a Document with hashes calculated.
- * Hashes are calculated based on page content and metadata.
- * It is used for indexing.
- */
- export class _HashedDocument {
- constructor(fields) {
- Object.defineProperty(this, "uid", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- Object.defineProperty(this, "hash_", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- Object.defineProperty(this, "contentHash", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- Object.defineProperty(this, "metadataHash", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- Object.defineProperty(this, "pageContent", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- Object.defineProperty(this, "metadata", {
- enumerable: true,
- configurable: true,
- writable: true,
- value: void 0
- });
- this.uid = fields.uid;
- this.pageContent = fields.pageContent;
- this.metadata = fields.metadata;
- }
- calculateHashes() {
- const forbiddenKeys = ["hash_", "content_hash", "metadata_hash"];
- for (const key of forbiddenKeys) {
- if (key in this.metadata) {
- throw new Error(`Metadata cannot contain key ${key} as it is reserved for internal use. Restricted keys: [${forbiddenKeys.join(", ")}]`);
- }
- }
- const contentHash = this._hashStringToUUID(this.pageContent);
- try {
- const metadataHash = this._hashNestedDictToUUID(this.metadata);
- this.contentHash = contentHash;
- this.metadataHash = metadataHash;
- }
- catch (e) {
- throw new Error(`Failed to hash metadata: ${e}. Please use a dict that can be serialized using json.`);
- }
- this.hash_ = this._hashStringToUUID(this.contentHash + this.metadataHash);
- if (!this.uid) {
- this.uid = this.hash_;
- }
- }
- toDocument() {
- return new Document({
- pageContent: this.pageContent,
- metadata: this.metadata,
- });
- }
- static fromDocument(document, uid) {
- const doc = new this({
- pageContent: document.pageContent,
- metadata: document.metadata,
- uid: uid || document.uid,
- });
- doc.calculateHashes();
- return doc;
- }
- _hashStringToUUID(inputString) {
- const hash_value = insecureHash(inputString);
- return uuidv5(hash_value, UUIDV5_NAMESPACE);
- }
- _hashNestedDictToUUID(data) {
- const serialized_data = JSON.stringify(data, Object.keys(data).sort());
- const hash_value = insecureHash(serialized_data);
- return uuidv5(hash_value, UUIDV5_NAMESPACE);
- }
- }
- export function _batch(size, iterable) {
- const batches = [];
- let currentBatch = [];
- iterable.forEach((item) => {
- currentBatch.push(item);
- if (currentBatch.length >= size) {
- batches.push(currentBatch);
- currentBatch = [];
- }
- });
- if (currentBatch.length > 0) {
- batches.push(currentBatch);
- }
- return batches;
- }
- export function _deduplicateInOrder(hashedDocuments) {
- const seen = new Set();
- const deduplicated = [];
- for (const hashedDoc of hashedDocuments) {
- if (!hashedDoc.hash_) {
- throw new Error("Hashed document does not have a hash");
- }
- if (!seen.has(hashedDoc.hash_)) {
- seen.add(hashedDoc.hash_);
- deduplicated.push(hashedDoc);
- }
- }
- return deduplicated;
- }
- export function _getSourceIdAssigner(sourceIdKey) {
- if (sourceIdKey === null) {
- return (_doc) => null;
- }
- else if (typeof sourceIdKey === "string") {
- return (doc) => doc.metadata[sourceIdKey];
- }
- else if (typeof sourceIdKey === "function") {
- return sourceIdKey;
- }
- else {
- throw new Error(`sourceIdKey should be null, a string or a function, got ${typeof sourceIdKey}`);
- }
- }
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- export const _isBaseDocumentLoader = (arg) => {
- if ("load" in arg &&
- typeof arg.load === "function" &&
- "loadAndSplit" in arg &&
- typeof arg.loadAndSplit === "function") {
- return true;
- }
- return false;
- };
- /**
- * Index data from the doc source into the vector store.
- *
- * Indexing functionality uses a manager to keep track of which documents
- * are in the vector store.
- *
- * This allows us to keep track of which documents were updated, and which
- * documents were deleted, which documents should be skipped.
- *
- * For the time being, documents are indexed using their hashes, and users
- * are not able to specify the uid of the document.
- *
- * @param {IndexArgs} args
- * @param {BaseDocumentLoader | DocumentInterface[]} args.docsSource The source of documents to index. Can be a DocumentLoader or a list of Documents.
- * @param {RecordManagerInterface} args.recordManager The record manager to use for keeping track of indexed documents.
- * @param {VectorStore} args.vectorStore The vector store to use for storing the documents.
- * @param {IndexOptions | undefined} args.options Options for indexing.
- * @returns {Promise<IndexingResult>}
- */
- export async function index(args) {
- const { docsSource, recordManager, vectorStore, options } = args;
- const { batchSize = 100, cleanup, sourceIdKey, cleanupBatchSize = 1000, forceUpdate = false, } = options ?? {};
- if (cleanup === "incremental" && !sourceIdKey) {
- throw new Error("sourceIdKey is required when cleanup mode is incremental. Please provide through 'options.sourceIdKey'.");
- }
- const docs = _isBaseDocumentLoader(docsSource)
- ? await docsSource.load()
- : docsSource;
- const sourceIdAssigner = _getSourceIdAssigner(sourceIdKey ?? null);
- const indexStartDt = await recordManager.getTime();
- let numAdded = 0;
- let numDeleted = 0;
- let numUpdated = 0;
- let numSkipped = 0;
- const batches = _batch(batchSize ?? 100, docs);
- for (const batch of batches) {
- const hashedDocs = _deduplicateInOrder(batch.map((doc) => _HashedDocument.fromDocument(doc)));
- const sourceIds = hashedDocs.map((doc) => sourceIdAssigner(doc));
- if (cleanup === "incremental") {
- hashedDocs.forEach((_hashedDoc, index) => {
- const source = sourceIds[index];
- if (source === null) {
- throw new Error("sourceIdKey must be provided when cleanup is incremental");
- }
- });
- }
- const batchExists = await recordManager.exists(hashedDocs.map((doc) => doc.uid));
- const uids = [];
- const docsToIndex = [];
- const docsToUpdate = [];
- const seenDocs = new Set();
- hashedDocs.forEach((hashedDoc, i) => {
- const docExists = batchExists[i];
- if (docExists) {
- if (forceUpdate) {
- seenDocs.add(hashedDoc.uid);
- }
- else {
- docsToUpdate.push(hashedDoc.uid);
- return;
- }
- }
- uids.push(hashedDoc.uid);
- docsToIndex.push(hashedDoc.toDocument());
- });
- if (docsToUpdate.length > 0) {
- await recordManager.update(docsToUpdate, { timeAtLeast: indexStartDt });
- numSkipped += docsToUpdate.length;
- }
- if (docsToIndex.length > 0) {
- await vectorStore.addDocuments(docsToIndex, { ids: uids });
- numAdded += docsToIndex.length - seenDocs.size;
- numUpdated += seenDocs.size;
- }
- await recordManager.update(hashedDocs.map((doc) => doc.uid), { timeAtLeast: indexStartDt, groupIds: sourceIds });
- if (cleanup === "incremental") {
- sourceIds.forEach((sourceId) => {
- if (!sourceId)
- throw new Error("Source id cannot be null");
- });
- const uidsToDelete = await recordManager.listKeys({
- before: indexStartDt,
- groupIds: sourceIds,
- });
- if (uidsToDelete.length > 0) {
- await vectorStore.delete({ ids: uidsToDelete });
- await recordManager.deleteKeys(uidsToDelete);
- numDeleted += uidsToDelete.length;
- }
- }
- }
- if (cleanup === "full") {
- let uidsToDelete = await recordManager.listKeys({
- before: indexStartDt,
- limit: cleanupBatchSize,
- });
- while (uidsToDelete.length > 0) {
- await vectorStore.delete({ ids: uidsToDelete });
- await recordManager.deleteKeys(uidsToDelete);
- numDeleted += uidsToDelete.length;
- uidsToDelete = await recordManager.listKeys({
- before: indexStartDt,
- limit: cleanupBatchSize,
- });
- }
- }
- return {
- numAdded,
- numDeleted,
- numUpdated,
- numSkipped,
- };
- }
|