Osmi
Использование OSMI-AI

Операция обновления или вставки данных

Upsert — это процесс загрузки и обработки документов в БД векторов, который служит основой для систем RAG (Retrieval Augmented Generation)

Существует два основных способа выполнения upsert (операция обновления или вставки данных) в БД векторов:

  • База документов (рекомендуется)
  • Upsert (операция обновления или вставки данных) через Агента

Мы настоятельно рекомендуем использовать базу документов, так как они предоставляют единый интерфейс для работы с RAG-конвейерами — получение данных из разных источников, стратегия разбиения на части (chunking), загрузка в векторную базу данных, синхронизация с обновлёнными данными. В этом руководстве мы рассмотрим другой метод — Upsert через Агента. Это более старая методика, применявшаяся до появления базы документов.

Подробности см. в документации к Vector Upsert Endpoint API.

Понимание процесса обновления или вставки данных

Агент позволяет создавать поток, который может выполнять как обновление/вставку данных (upserting), так и процесс запросов с использованием RAG (Retrieval-Augmented Generation). Оба эти процесса могут работать независимо друг от друга.

Настройка

Чтобы выполнить процесс обновления или вставки данных (upsert), необходимо создать поток с пятью различными узлами:

- Загрузчик документов (Document Loader)

- Разделитель текста (Text Splitter)

- Модель векторных вложений (Embedding Model)

- БД векторов (Vector Store)

- БД записей (Record Manager) — опционально

Все эти элементы подробнее описаны в разделе «База документов» — обращайтесь туда за дополнительной информацией.

После правильной настройки потока в правом верхнем углу появится зеленая кнопка, которая позволяет запустить процесс обновления/вставки.

Также процесс upsert можно выполнить через API:

Базовый URL и аутентификация

Методы запроса

API поддерживает два типа методов в зависимости от конфигурации вашего agent:

  1. Form Data (загрузка файлов) Используется, если ваш аге содержит загрузчики документов с возможностью загрузки файлов.
  1. JSON Body (без загрузки файлов) Используется, если загрузчики документов не требуют загрузки файлов (например, веб-скраперы, коннекторы баз данных).

Чтобы переопределить настройки узлов, такие как файлы, метаданные и т. д., необходимо явно включить соответствующую опцию.

Загрузчики документов с возможностью загрузки файлов

Поддерживаемые типы документов

Загрузчик документовТипы файлов
CSV File.csv
Docx/Word File.docx
JSON File.json
JSON Lines File.jsonl
PDF File.pdf
Text File.txt
Excel File.xlsx
Powerpoint File.pptx
File LoaderMultiple
Unstructured FileMultiple

Важно: Убедитесь, что тип файла соответствует настройкам вашего загрузчика документов. Для максимальной гибкости рекомендуется использовать загрузчик File Loader, который поддерживает несколько типов файлов.

Формат запроса

При загрузке файлов используйте multipart/form-data вместо JSON.

Примеры

import requests
import os

def upsert_document(agent_id, file_path, config=None):
    """
    Upsert a single document to a vector store.
    
    Args:
        agent_id (str): The agent ID configured for vector upserting
        file_path (str): Path to the file to upload
        return_source_docs (bool): Whether to return source documents in response
        config (dict): Optional configuration overrides
    
    Returns:
        dict: API response containing upsert results
    """
    url = f"http://localhost:3000/api/v1/vector/upsert/{agent_id}"
    
    # Prepare file data
    files = {
        'files': (os.path.basename(file_path), open(file_path, 'rb'))
    }
    
    # Prepare form data
    data = {}
    
    # Add configuration overrides if provided
    if config:
        data['overrideConfig'] = str(config).replace("'", '"')  # Convert to JSON string
    
    try:
        response = requests.post(url, files=files, data=data)
        response.raise_for_status()
        
        return response.json()
        
    except requests.exceptions.RequestException as e:
        print(f"Upload failed: {e}")
        return None
    finally:
        # Always close the file
        files['files'][1].close()

# Example usage
result = upsert_document(
    agent_id="your-agent-id",
    file_path="documents/knowledge_base.pdf",
    config={
        "chunkSize": 1000,
        "chunkOverlap": 200
    }
)

if result:
    print(f"Successfully upserted {result.get('numAdded', 0)} chunks")
    if result.get('sourceDocuments'):
        print(f"Source documents: {len(result['sourceDocuments'])}")
else:
    print("Upload failed")
class VectorUploader {
    constructor(baseUrl = 'http://localhost:3000') {
        this.baseUrl = baseUrl;
    }
    
    async upsertDocument(agentId, file, config = {}) {
        /**
         * Upload a file to vector store from browser
         * @param {string} agentId - The agent ID
         * @param {File} file - File object from input element
         * @param {Object} config - Optional configuration
         */
        
        const formData = new FormData();
        formData.append('files', file);
        
        if (config.overrideConfig) {
            formData.append('overrideConfig', JSON.stringify(config.overrideConfig));
        }
        
        try {
            const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${agentId}`, {
                method: 'POST',
                body: formData
            });
            
            if (!response.ok) {
                throw new Error(`HTTP error! status: ${response.status}`);
            }
            
            const result = await response.json();
            return result;
            
        } catch (error) {
            console.error('Upload failed:', error);
            throw error;
        }
    }
    
  
}

// Example usage in browser
const uploader = new VectorUploader();

// Single file upload
document.getElementById('fileInput').addEventListener('change', async function(e) {
    const file = e.target.files[0];
    if (file) {
        try {
            const result = await uploader.upsertDocument(
                'your-agent-id',
                file,
                {
                    overrideConfig: {
                        chunkSize: 1000,
                        chunkOverlap: 200
                    }
                }
            );
            
            console.log('Upload successful:', result);
            alert(`Successfully processed ${result.numAdded || 0} chunks`);
            
        } catch (error) {
            console.error('Upload failed:', error);
            alert('Upload failed: ' + error.message);
        }
    }
});
const fs = require('fs');
const path = require('path');
const FormData = require('form-data');
const fetch = require('node-fetch');

class NodeVectorUploader {
    constructor(baseUrl = 'http://localhost:3000') {
        this.baseUrl = baseUrl;
    }
    
    async upsertDocument(agentId, filePath, config = {}) {
        /**
         * Upload a file to vector store from Node.js
         * @param {string} agentId - The agent ID
         * @param {string} filePath - Path to the file
         * @param {Object} config - Optional configuration
         */
        
        if (!fs.existsSync(filePath)) {
            throw new Error(`File not found: ${filePath}`);
        }
        
        const formData = new FormData();
        const fileStream = fs.createReadStream(filePath);
        
        formData.append('files', fileStream, {
            filename: path.basename(filePath),
            contentType: this.getMimeType(filePath)
        });
        
        if (config.overrideConfig) {
            formData.append('overrideConfig', JSON.stringify(config.overrideConfig));
        }
        
        try {
            const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${agentId}`, {
                method: 'POST',
                body: formData,
                headers: formData.getHeaders()
            });
            
            if (!response.ok) {
                const errorText = await response.text();
                throw new Error(`HTTP ${response.status}: ${errorText}`);
            }
            
            return await response.json();
            
        } catch (error) {
            console.error('Upload failed:', error);
            throw error;
        }
    }

    getMimeType(filePath) {
        const ext = path.extname(filePath).toLowerCase();
        const mimeTypes = {
            '.pdf': 'application/pdf',
            '.txt': 'text/plain',
            '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
            '.csv': 'text/csv',
            '.json': 'application/json'
        };
        return mimeTypes[ext] || 'application/octet-stream';
    }
}

// Example usage
async function main() {
    const uploader = new NodeVectorUploader();
    
    try {
        // Single file upload
        const result = await uploader.upsertDocument(
            'your-agent-id',
            './documents/manual.pdf',
            {
                overrideConfig: {
                    chunkSize: 1200,
                    chunkOverlap: 100
                }
            }
        );
        
        console.log('Single file upload result:', result); 
    } catch (error) {
        console.error('Process failed:', error);
    }
}

// Run if this file is executed directly
if (require.main === module) {
    main();
}

module.exports = { NodeVectorUploader };
# Basic file upload with cURL
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
  -F "files=@documents/knowledge_base.pdf"

# File upload with configuration override
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
  -F "files=@documents/manual.pdf" \
  -F 'overrideConfig={"chunkSize": 1000, "chunkOverlap": 200}'

# Upload with custom headers for authentication (if configured)
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
  -H "Authorization: Bearer your-api-token" \
  -F "files=@documents/faq.txt" \
  -F 'overrideConfig={"chunkSize": 800, "chunkOverlap": 150}'

Загрузчики документов без загрузки файлов

Для загрузчиков документов, которые не требуют загрузки файлов (например, веб-скрейперы, соединители с базами данных, интеграции API), используйте формат JSON, аналогичный API для предсказаний.

Примеры

import requests
from typing import Dict, Any, Optional

def upsert(agent_id: str, config: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
    """
    Trigger vector upserting for agents that don't require file uploads.
    
    Args:
        agent_id: The agent ID configured for vector upserting
        config: Optional configuration overrides
    
    Returns:
        API response containing upsert results
    """
    url = f"http://localhost:3000/api/v1/vector/upsert/{agent_id}"
    
    payload = {
        "overrideConfig": config
    }
    
    headers = {
        "Content-Type": "application/json"
    }
    
    try:
        response = requests.post(url, json=payload, headers=headers, timeout=300)
        response.raise_for_status()
        
        return response.json()
        
    except requests.exceptions.RequestException as e:
        print(f"Upsert failed: {e}")
        return None

result = upsert(
    agent_id="agent-id",
    config={
        "chunkSize": 800,
        "chunkOverlap": 100,
    }
)

if result:
    print(f"Upsert completed: {result.get('numAdded', 0)} chunks added")
class NoFileUploader {
    constructor(baseUrl = 'http://localhost:3000') {
        this.baseUrl = baseUrl;
    }
    
    async upsertWithoutFiles(agentId, config = {}) {
        /**
         * Trigger vector upserting for flows that don't need file uploads
         * @param {string} agentId - The agent ID
         * @param {Object} config - Configuration overrides
         */
        
        const payload = {
            overrideConfig: config
        };
        
        try {
            const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${agentId}`, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify(payload)
            });
            
            if (!response.ok) {
                throw new Error(`HTTP error! status: ${response.status}`);
            }
            
            return await response.json();
            
        } catch (error) {
            console.error('Upsert failed:', error);
            throw error;
        }
    }
    
    async scheduledUpsert(agentId, interval = 3600000) {
        /**
         * Set up scheduled upserting for dynamic content sources
         * @param {string} agentId - The agent ID
         * @param {number} interval - Interval in milliseconds (default: 1 hour)
         */
        
        console.log(`Starting scheduled upsert every ${interval/1000} seconds`);
        
        const performUpsert = async () => {
            try {
                console.log('Performing scheduled upsert...');
                
                const result = await this.upsertWithoutFiles(agentId, {
                    addMetadata: {
                        scheduledUpdate: true,
                        timestamp: new Date().toISOString()
                    }
                });
                
                console.log(`Scheduled upsert completed: ${result.numAdded || 0} chunks processed`);
                
            } catch (error) {
                console.error('Scheduled upsert failed:', error);
            }
        };
        
        // Perform initial upsert
        await performUpsert();
        
        // Set up recurring upserts
        return setInterval(performUpsert, interval);
    }
}

// Example usage
const uploader = new NoFileUploader();

async function performUpsert() {
    try {
        const result = await uploader.upsertWithoutFiles(
            'agent-id',
            {
                chunkSize: 800,
                chunkOverlap: 100
            }
        );
        
        console.log('Upsert result:', result);
        
    } catch (error) {
        console.error('Upsert failed:', error);
    }
}

// One time upsert
await performUpsert();

// Set up scheduled updates (every 30 minutes)
const schedulerHandle = await uploader.scheduledUpsert(
    'dynamic-content-agent-id',
    30 * 60 * 1000
);

// To stop scheduled updates later:
// clearInterval(schedulerHandle);
# Basic upsert with cURL
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
  -H "Content-Type: application/json"

# Upsert with configuration override
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
  -H "Content-Type: application/json" \
  -d '{
    "overrideConfig": {
      "returnSourceDocuments": true
    }
  }'
  
# Upsert with custom headers for authentication (if configured)
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
  -H "Authorization: Bearer your-api-token" \
  -H "Content-Type: application/json"

Поля ответа

FieldTypeDescription
numAddednumberКоличество новых чанков, добавленных в векторное хранилище
numDeletednumberКоличество удалённых чанков (если используется Record Manager)
numSkippednumberКоличество пропущенных чанков (если используется Record Manager)
numUpdatednumberКоличество обновлённых существующих чанков (если есть Record Manager)

Стратегии оптимизации

1. Пакетная обработка

def intelligent_batch_processing(files: List[str], agent_id: str) -> Dict[str, Any]:
    """Process files in optimized batches based on size and type."""
    
    # Group files by size and type
    small_files = []
    large_files = []
    
    for file_path in files:
        file_size = os.path.getsize(file_path)
        if file_size > 5_000_000:  # 5MB
            large_files.append(file_path)
        else:
            small_files.append(file_path)
    
    results = {'successful': [], 'failed': [], 'totalChunks': 0}
    
    # Process large files individually
    for file_path in large_files:
        print(f"Processing large file: {file_path}")
        # Individual processing with custom config
        # ... implementation
    
    # Process small files in batches
    batch_size = 5
    for i in range(0, len(small_files), batch_size):
        batch = small_files[i:i + batch_size]
        print(f"Processing batch of {len(batch)} small files")
        # Batch processing
        # ... implementation
    
    return results

2. Оптимизация метаданных

import requests
import os
from datetime import datetime
from typing import Dict, Any

def upsert_with_optimized_metadata(agent_id: str, file_path: str, 
                                 department: str = None, category: str = None) -> Dict[str, Any]:
    """
    Upsert document with automatically optimized metadata.
    """
    url = f"http://localhost:3000/api/v1/vector/upsert/{agent_id}"
    
    # Generate optimized metadata
    custom_metadata = {
        'department': department or 'general',
        'category': category or 'documentation',
        'indexed_date': datetime.now().strftime('%Y-%m-%d'),
        'version': '1.0'
    }
    
    optimized_metadata = optimize_metadata(file_path, custom_metadata)
    
    # Prepare request
    files = {'files': (os.path.basename(file_path), open(file_path, 'rb'))}
    data = {
        'overrideConfig': str({
            'metadata': optimized_metadata
        }).replace("'", '"')
    }
    
    try:
        response = requests.post(url, files=files, data=data)
        response.raise_for_status()
        return response.json()
    finally:
        files['files'][1].close()

# Example usage with different document types
results = []

# Technical documentation
tech_result = upsert_with_optimized_metadata(
    agent_id="tech-docs-agent",
    file_path="docs/api_reference.pdf",
    department="engineering",
    category="technical_docs"
)
results.append(tech_result)

# HR policies
hr_result = upsert_with_optimized_metadata(
    agent_id="hr-docs-agent", 
    file_path="policies/employee_handbook.pdf",
    department="human_resources",
    category="policies"
)
results.append(hr_result)

# Marketing materials
marketing_result = upsert_with_optimized_metadata(
    agent_id="marketing-agent",
    file_path="marketing/product_brochure.pdf", 
    department="marketing",
    category="promotional"
)
results.append(marketing_result)

for i, result in enumerate(results):
    print(f"Upload {i+1}: {result.get('numAdded', 0)} chunks added")

Устранение неполадок

Проблемы с загрузкой файлов

  • Проверьте совместимость формата файла
  • Проверьте ограничения на размер файла

Таймаут обработки

  • Увеличьте таймаут запроса
  • Разбейте большие файлы на части
  • Оптимизируйте размер чанков

Ошибки в БД векторов

  • Проверьте доступность/подключение к БД вектору
  • Убедитесь, что размерность модели эмбеддингов совместима с индексом хранилища