Операция обновления или вставки данных
Существует два основных способа выполнения upsert (операция обновления или вставки данных) в БД векторов:
- База документов (рекомендуется)
- Upsert (операция обновления или вставки данных) через Агента
Мы настоятельно рекомендуем использовать базу документов, так как они предоставляют единый интерфейс для работы с RAG-конвейерами — получение данных из разных источников, стратегия разбиения на части (chunking), загрузка в векторную базу данных, синхронизация с обновлёнными данными. В этом руководстве мы рассмотрим другой метод — Upsert через Агента. Это более старая методика, применявшаяся до появления базы документов.
Подробности см. в документации к Vector Upsert Endpoint API.
Понимание процесса обновления или вставки данных
Агент позволяет создавать поток, который может выполнять как обновление/вставку данных (upserting), так и процесс запросов с использованием RAG (Retrieval-Augmented Generation). Оба эти процесса могут работать независимо друг от друга.
Настройка
Чтобы выполнить процесс обновления или вставки данных (upsert), необходимо создать поток с пятью различными узлами:
- Загрузчик документов (Document Loader)
- Разделитель текста (Text Splitter)
- Модель векторных вложений (Embedding Model)
- БД векторов (Vector Store)
- БД записей (Record Manager) — опционально
Все эти элементы подробнее описаны в разделе «База документов» — обращайтесь туда за дополнительной информацией.
После правильной настройки потока в правом верхнем углу появится зеленая кнопка, которая позволяет запустить процесс обновления/вставки.
Также процесс upsert можно выполнить через API:
Базовый URL и аутентификация
- Базовый URL: http://localhost:3000 (или URL вашего экземпляра OSMI)
- Точка входа: POST /api/v1/vector/upsert/:id
- Аутентификация: см. раздел «Аутентификация для потоков»
Методы запроса
API поддерживает два типа методов в зависимости от конфигурации вашего agent:
- Form Data (загрузка файлов) Используется, если ваш аге содержит загрузчики документов с возможностью загрузки файлов.
- JSON Body (без загрузки файлов) Используется, если загрузчики документов не требуют загрузки файлов (например, веб-скраперы, коннекторы баз данных).
Чтобы переопределить настройки узлов, такие как файлы, метаданные и т. д., необходимо явно включить соответствующую опцию.
Загрузчики документов с возможностью загрузки файлов
Поддерживаемые типы документов
| Загрузчик документов | Типы файлов |
|---|---|
| CSV File | .csv |
| Docx/Word File | .docx |
| JSON File | .json |
| JSON Lines File | .jsonl |
| PDF File | .pdf |
| Text File | .txt |
| Excel File | .xlsx |
| Powerpoint File | .pptx |
| File Loader | Multiple |
| Unstructured File | Multiple |
Важно: Убедитесь, что тип файла соответствует настройкам вашего загрузчика документов. Для максимальной гибкости рекомендуется использовать загрузчик File Loader, который поддерживает несколько типов файлов.
Формат запроса
При загрузке файлов используйте multipart/form-data вместо JSON.
Примеры
import requests
import os
def upsert_document(agent_id, file_path, config=None):
"""
Upsert a single document to a vector store.
Args:
agent_id (str): The agent ID configured for vector upserting
file_path (str): Path to the file to upload
return_source_docs (bool): Whether to return source documents in response
config (dict): Optional configuration overrides
Returns:
dict: API response containing upsert results
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{agent_id}"
# Prepare file data
files = {
'files': (os.path.basename(file_path), open(file_path, 'rb'))
}
# Prepare form data
data = {}
# Add configuration overrides if provided
if config:
data['overrideConfig'] = str(config).replace("'", '"') # Convert to JSON string
try:
response = requests.post(url, files=files, data=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Upload failed: {e}")
return None
finally:
# Always close the file
files['files'][1].close()
# Example usage
result = upsert_document(
agent_id="your-agent-id",
file_path="documents/knowledge_base.pdf",
config={
"chunkSize": 1000,
"chunkOverlap": 200
}
)
if result:
print(f"Successfully upserted {result.get('numAdded', 0)} chunks")
if result.get('sourceDocuments'):
print(f"Source documents: {len(result['sourceDocuments'])}")
else:
print("Upload failed")
class VectorUploader {
constructor(baseUrl = 'http://localhost:3000') {
this.baseUrl = baseUrl;
}
async upsertDocument(agentId, file, config = {}) {
/**
* Upload a file to vector store from browser
* @param {string} agentId - The agent ID
* @param {File} file - File object from input element
* @param {Object} config - Optional configuration
*/
const formData = new FormData();
formData.append('files', file);
if (config.overrideConfig) {
formData.append('overrideConfig', JSON.stringify(config.overrideConfig));
}
try {
const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${agentId}`, {
method: 'POST',
body: formData
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const result = await response.json();
return result;
} catch (error) {
console.error('Upload failed:', error);
throw error;
}
}
}
// Example usage in browser
const uploader = new VectorUploader();
// Single file upload
document.getElementById('fileInput').addEventListener('change', async function(e) {
const file = e.target.files[0];
if (file) {
try {
const result = await uploader.upsertDocument(
'your-agent-id',
file,
{
overrideConfig: {
chunkSize: 1000,
chunkOverlap: 200
}
}
);
console.log('Upload successful:', result);
alert(`Successfully processed ${result.numAdded || 0} chunks`);
} catch (error) {
console.error('Upload failed:', error);
alert('Upload failed: ' + error.message);
}
}
});
const fs = require('fs');
const path = require('path');
const FormData = require('form-data');
const fetch = require('node-fetch');
class NodeVectorUploader {
constructor(baseUrl = 'http://localhost:3000') {
this.baseUrl = baseUrl;
}
async upsertDocument(agentId, filePath, config = {}) {
/**
* Upload a file to vector store from Node.js
* @param {string} agentId - The agent ID
* @param {string} filePath - Path to the file
* @param {Object} config - Optional configuration
*/
if (!fs.existsSync(filePath)) {
throw new Error(`File not found: ${filePath}`);
}
const formData = new FormData();
const fileStream = fs.createReadStream(filePath);
formData.append('files', fileStream, {
filename: path.basename(filePath),
contentType: this.getMimeType(filePath)
});
if (config.overrideConfig) {
formData.append('overrideConfig', JSON.stringify(config.overrideConfig));
}
try {
const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${agentId}`, {
method: 'POST',
body: formData,
headers: formData.getHeaders()
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`HTTP ${response.status}: ${errorText}`);
}
return await response.json();
} catch (error) {
console.error('Upload failed:', error);
throw error;
}
}
getMimeType(filePath) {
const ext = path.extname(filePath).toLowerCase();
const mimeTypes = {
'.pdf': 'application/pdf',
'.txt': 'text/plain',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.csv': 'text/csv',
'.json': 'application/json'
};
return mimeTypes[ext] || 'application/octet-stream';
}
}
// Example usage
async function main() {
const uploader = new NodeVectorUploader();
try {
// Single file upload
const result = await uploader.upsertDocument(
'your-agent-id',
'./documents/manual.pdf',
{
overrideConfig: {
chunkSize: 1200,
chunkOverlap: 100
}
}
);
console.log('Single file upload result:', result);
} catch (error) {
console.error('Process failed:', error);
}
}
// Run if this file is executed directly
if (require.main === module) {
main();
}
module.exports = { NodeVectorUploader };
# Basic file upload with cURL
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
-F "files=@documents/knowledge_base.pdf"
# File upload with configuration override
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
-F "files=@documents/manual.pdf" \
-F 'overrideConfig={"chunkSize": 1000, "chunkOverlap": 200}'
# Upload with custom headers for authentication (if configured)
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
-H "Authorization: Bearer your-api-token" \
-F "files=@documents/faq.txt" \
-F 'overrideConfig={"chunkSize": 800, "chunkOverlap": 150}'
Загрузчики документов без загрузки файлов
Для загрузчиков документов, которые не требуют загрузки файлов (например, веб-скрейперы, соединители с базами данных, интеграции API), используйте формат JSON, аналогичный API для предсказаний.
Примеры
import requests
from typing import Dict, Any, Optional
def upsert(agent_id: str, config: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""
Trigger vector upserting for agents that don't require file uploads.
Args:
agent_id: The agent ID configured for vector upserting
config: Optional configuration overrides
Returns:
API response containing upsert results
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{agent_id}"
payload = {
"overrideConfig": config
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=300)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Upsert failed: {e}")
return None
result = upsert(
agent_id="agent-id",
config={
"chunkSize": 800,
"chunkOverlap": 100,
}
)
if result:
print(f"Upsert completed: {result.get('numAdded', 0)} chunks added")
class NoFileUploader {
constructor(baseUrl = 'http://localhost:3000') {
this.baseUrl = baseUrl;
}
async upsertWithoutFiles(agentId, config = {}) {
/**
* Trigger vector upserting for flows that don't need file uploads
* @param {string} agentId - The agent ID
* @param {Object} config - Configuration overrides
*/
const payload = {
overrideConfig: config
};
try {
const response = await fetch(`${this.baseUrl}/api/v1/vector/upsert/${agentId}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(payload)
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return await response.json();
} catch (error) {
console.error('Upsert failed:', error);
throw error;
}
}
async scheduledUpsert(agentId, interval = 3600000) {
/**
* Set up scheduled upserting for dynamic content sources
* @param {string} agentId - The agent ID
* @param {number} interval - Interval in milliseconds (default: 1 hour)
*/
console.log(`Starting scheduled upsert every ${interval/1000} seconds`);
const performUpsert = async () => {
try {
console.log('Performing scheduled upsert...');
const result = await this.upsertWithoutFiles(agentId, {
addMetadata: {
scheduledUpdate: true,
timestamp: new Date().toISOString()
}
});
console.log(`Scheduled upsert completed: ${result.numAdded || 0} chunks processed`);
} catch (error) {
console.error('Scheduled upsert failed:', error);
}
};
// Perform initial upsert
await performUpsert();
// Set up recurring upserts
return setInterval(performUpsert, interval);
}
}
// Example usage
const uploader = new NoFileUploader();
async function performUpsert() {
try {
const result = await uploader.upsertWithoutFiles(
'agent-id',
{
chunkSize: 800,
chunkOverlap: 100
}
);
console.log('Upsert result:', result);
} catch (error) {
console.error('Upsert failed:', error);
}
}
// One time upsert
await performUpsert();
// Set up scheduled updates (every 30 minutes)
const schedulerHandle = await uploader.scheduledUpsert(
'dynamic-content-agent-id',
30 * 60 * 1000
);
// To stop scheduled updates later:
// clearInterval(schedulerHandle);
# Basic upsert with cURL
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
-H "Content-Type: application/json"
# Upsert with configuration override
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
-H "Content-Type: application/json" \
-d '{
"overrideConfig": {
"returnSourceDocuments": true
}
}'
# Upsert with custom headers for authentication (if configured)
curl -X POST "http://localhost:3000/api/v1/vector/upsert/your-agent-id" \
-H "Authorization: Bearer your-api-token" \
-H "Content-Type: application/json"
Поля ответа
| Field | Type | Description |
|---|---|---|
numAdded | number | Количество новых чанков, добавленных в векторное хранилище |
numDeleted | number | Количество удалённых чанков (если используется Record Manager) |
numSkipped | number | Количество пропущенных чанков (если используется Record Manager) |
numUpdated | number | Количество обновлённых существующих чанков (если есть Record Manager) |
Стратегии оптимизации
1. Пакетная обработка
def intelligent_batch_processing(files: List[str], agent_id: str) -> Dict[str, Any]:
"""Process files in optimized batches based on size and type."""
# Group files by size and type
small_files = []
large_files = []
for file_path in files:
file_size = os.path.getsize(file_path)
if file_size > 5_000_000: # 5MB
large_files.append(file_path)
else:
small_files.append(file_path)
results = {'successful': [], 'failed': [], 'totalChunks': 0}
# Process large files individually
for file_path in large_files:
print(f"Processing large file: {file_path}")
# Individual processing with custom config
# ... implementation
# Process small files in batches
batch_size = 5
for i in range(0, len(small_files), batch_size):
batch = small_files[i:i + batch_size]
print(f"Processing batch of {len(batch)} small files")
# Batch processing
# ... implementation
return results
2. Оптимизация метаданных
import requests
import os
from datetime import datetime
from typing import Dict, Any
def upsert_with_optimized_metadata(agent_id: str, file_path: str,
department: str = None, category: str = None) -> Dict[str, Any]:
"""
Upsert document with automatically optimized metadata.
"""
url = f"http://localhost:3000/api/v1/vector/upsert/{agent_id}"
# Generate optimized metadata
custom_metadata = {
'department': department or 'general',
'category': category or 'documentation',
'indexed_date': datetime.now().strftime('%Y-%m-%d'),
'version': '1.0'
}
optimized_metadata = optimize_metadata(file_path, custom_metadata)
# Prepare request
files = {'files': (os.path.basename(file_path), open(file_path, 'rb'))}
data = {
'overrideConfig': str({
'metadata': optimized_metadata
}).replace("'", '"')
}
try:
response = requests.post(url, files=files, data=data)
response.raise_for_status()
return response.json()
finally:
files['files'][1].close()
# Example usage with different document types
results = []
# Technical documentation
tech_result = upsert_with_optimized_metadata(
agent_id="tech-docs-agent",
file_path="docs/api_reference.pdf",
department="engineering",
category="technical_docs"
)
results.append(tech_result)
# HR policies
hr_result = upsert_with_optimized_metadata(
agent_id="hr-docs-agent",
file_path="policies/employee_handbook.pdf",
department="human_resources",
category="policies"
)
results.append(hr_result)
# Marketing materials
marketing_result = upsert_with_optimized_metadata(
agent_id="marketing-agent",
file_path="marketing/product_brochure.pdf",
department="marketing",
category="promotional"
)
results.append(marketing_result)
for i, result in enumerate(results):
print(f"Upload {i+1}: {result.get('numAdded', 0)} chunks added")
Устранение неполадок
Проблемы с загрузкой файлов
- Проверьте совместимость формата файла
- Проверьте ограничения на размер файла
Таймаут обработки
- Увеличьте таймаут запроса
- Разбейте большие файлы на части
- Оптимизируйте размер чанков
Ошибки в БД векторов
- Проверьте доступность/подключение к БД вектору
- Убедитесь, что размерность модели эмбеддингов совместима с индексом хранилища

