У ваших менеджеров уходит по 2-3 часа в день на поиск ответов в договорах, прайсах и регламентах. Клиент ждёт, пока сотрудник перероет 50 PDF. Половина ответов - неточные, сделки срываются. В этой статье разберём, как собрать ИИ-помощника, который найдёт любой документ за секунду. Без программистов, без курсов на полгода, за вечер.
Почему прототип не работает в бою
Вы наверняка слышали про RAG (Retrieval-Augmented Generation - поиск с дополнением генерации). Прототип собирается за час: загрузили PDF, нарезали, закинули в базу, спросили - ответил. Но когда на него приходит 100 запросов в день, всё ломается: долго, дорого, данные устаревают.
Разберём на примере стройфирмы. У вас есть прайс на работы, типовой договор подряда, техзадания на 50 объектов. Менеджер спрашивает: "Какие штрафы за просрочку?". Прототип ищет по всем документам, но если вчера обновили договор - старые данные уже неактуальны. Или если спросить "условия расторжения", а в договоре написано "прекращение" - векторный поиск может не найти.
Правильная архитектура разделяет два потока:
Поток индексирования:
Хранилище документов -> Парсер -> Нарезка -> Эмбеддинги -> Векторная база
(асинхронно, пакетами, не мешает запросам)
Поток запросов:
Запрос пользователя -> Эмбеддинг -> Векторная база -> Переупорядочение -> LLM -> Ответ
(синхронно, быстро, кешируется при возможности)
Следующий блок кода - пример для вашего менеджера, как сделать индексирование фоновым процессом. Не вникайте в детали, просто покажите тому, кто будет настраивать.
from celery import Celery
import hashlib
app = Celery('indexer', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def index_document(self, doc_id: str, content: str, metadata: dict, force_reindex: bool = False):
try:
# Проверяем хеш - не изменился ли документ
content_hash = hashlib.sha256(content.encode()).hexdigest()
if not force_reindex and document_hash_matches(doc_id, content_hash):
return {'status': 'skipped', 'reason': 'no_changes'}
chunks = create_chunks(content, metadata)
embeddings = embed_chunks(chunks)
upsert_to_qdrant(chunks, embeddings, doc_id)
update_document_hash(doc_id, content_hash)
return {'status': 'ok', 'chunks': len(chunks)}
except Exception as exc:
# Повторная попытка с экспоненциальной задержкой
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
Трёхуровневое хранилище: не храните всё в одном месте
Хранить все документы в самом быстром хранилище - дорого. Логика простая: часто используемые документы - на быстром и дорогом уровне, редко используемые - на медленном и дешёвом.
- Горячий уровень (векторная база) - активно используемые документы. Стоимость: 10-30 центов за ГБ в месяц.
- Тёплый уровень (Postgres, Elasticsearch) - полный текст всех фрагментов. Стоимость: 2-5 центов за ГБ.
- Холодный уровень (S3) - оригиналы документов, исторические версии. Стоимость: 0.2 цента за ГБ.
Для стройфирмы: горячий уровень - текущий прайс и типовой договор, тёплый - все техзадания, холодный - старые версии договоров.
class TieredRAGStore:
def __init__(self):
self.hot = QdrantClient('http://qdrant:6333') # векторы
self.warm = psycopg2.connect(DATABASE_URL) # тексты + метаданные
self.cold = boto3.client('s3') # оригиналы
def retrieve(self, query: str, k: int = 5) -> list:
# 1. Векторный поиск в горячем уровне
query_vec = embedder.encode(query)
vector_results = self.hot.query_points(
collection_name='chunks',
query=query_vec.tolist(),
limit=20
)
chunk_ids = [r.id for r in vector_results.points]
# 2. Полные тексты из тёплого уровня
chunks = self.fetch_from_warm(chunk_ids)
# 3. Переупорядочение и возврат топ-k
return rerank(query, chunks, top_k=k)
Гибридный поиск: векторный + ключевой - на 15-30% больше точных ответов
Векторный поиск хорош для синонимов, но плохо ищет точные совпадения. Ключевой поиск (BM25) - наоборот. Вместе они дают лучший результат.
Пример: клиент спрашивает "условия расторжения контракта". Векторный поиск найдёт "прекращение договора" (синоним). Ключевой - точное "расторжение". Объединение даёт оба варианта.
import asyncio
from functools import lru_cache
import pickle
@lru_cache(maxsize=1)
def get_bm25():
# Загружаем BM25-индекс один раз при старте сервиса
with open('/data/bm25_index.pkl', 'rb') as f:
return pickle.load(f)
async def hybrid_retrieve(query: str, filters: dict = None, k: int = 20) -> list:
bm25 = get_bm25()
async def dense_search():
vec = await asyncio.to_thread(embedder.encode, query)
return await qdrant_client.async_search(
collection_name='chunks',
query_vector=vec.tolist(),
limit=k
)
def bm25_search():
tokenized = tokenize_ru(query)
scores = bm25.get_scores(tokenized)
top_idx = scores.argsort()[-k:][::-1]
return [(corpus_ids[i], float(scores[i])) for i in top_idx]
# Запускаем параллельно: задержка = max(dense, bm25), не их сумма
dense_results, bm25_results = await asyncio.gather(
dense_search(),
asyncio.to_thread(bm25_search)
)
# Слияние RRF
dense_ranking = [(r.id, r.score) for r in dense_results]
hybrid = reciprocal_rank_fusion([dense_ranking, bm25_results])
return hybrid[:k]
# Итоговая задержка: векторный(8мс) + BM25(3мс) параллельно = ~10мс суммарно
Переупорядочение: последний фильтр перед ответом
Векторный поиск возвращает 20 кандидатов. Языковой модели нужно только 5. Переупорядочение (reranking) - отдельная нейронная сеть, которая точнее оценивает релевантность. Добавляет 200-400 миллисекунд, но качество ответов заметно выше.
Варианты:
- Cohere Rerank v3.5 - облачный сервис, $2 за 1000 запросов. При 50 000 запросов в день = $100/день.
- BGE-reranker-v2-m3 - своя модель на видеокарте. A10G ~$0.75/час = $540/мес. Если больше 18 000 запросов в день - своя видеокарта дешевле.
from cohere import AsyncClient as AsyncCohere
import asyncio
co = AsyncCohere(api_key=COHERE_API_KEY)
async def rerank_results(query: str, candidates: list, top_n: int = 5) -> list:
try:
async with asyncio.timeout(2.0): # не ждём дольше 2 секунд
response = await co.rerank(
model='rerank-v3.5',
query=query,
documents=candidates,
top_n=top_n,
return_documents=True
)
return [{'text': r.document.text, 'score': r.relevance_score} for r in response.results]
except asyncio.TimeoutError:
# Запасной путь: возвращаем первые top_n без переупорядочения
return [{'text': c, 'score': None} for c in candidates[:top_n]]
Обновление индекса без остановки
Каждый раз перестраивать весь индекс при изменении документа - неприемлемо. Нужно точечное обновление: добавили новый документ - обновили только его фрагменты, удалили - пометили как удалённые.
import hashlib
from datetime import datetime
class IncrementalIndexManager:
def __init__(self, qdrant, pg_conn):
self.qdrant = qdrant
self.pg = pg_conn
def upsert_document(self, doc_id: str, content: str, metadata: dict):
content_hash = hashlib.sha256(content.encode()).hexdigest()
with self.pg.cursor() as cur:
cur.execute(
'SELECT content_hash, chunk_ids FROM doc_versions WHERE doc_id = %s',
(doc_id,)
)
prev = cur.fetchone()
if prev and prev['content_hash'] == content_hash:
return # документ не изменился, пропускаем
# Удаляем старые фрагменты если они были
if prev:
self.qdrant.delete(
collection_name='chunks',
points_selector=PointIdsList(points=prev['chunk_ids'])
)
# Индексируем новые фрагменты
new_chunks = create_chunks(content, metadata)
new_chunk_ids = [f'{doc_id}_{i}' for i in range(len(new_chunks))]
embeddings = embedder.encode([c.page_content for c in new_chunks])
self.qdrant.upsert(
collection_name='chunks',
points=[
PointStruct(
id=chunk_id,
vector=emb.tolist(),
payload={**new_chunks[i].metadata, 'content': new_chunks[i].page_content}
)
for i, (chunk_id, emb) in enumerate(zip(new_chunk_ids, embeddings))
]
)
Семантическое кеширование: экономия до 68% стоимости
Обычный кеш хранит ответ только при точном совпадении текста. Семантический кеш - для похожих по смыслу запросов. "Как настроить Qdrant?" и "Настройка Qdrant" - один ответ.
По данным LangChain 2024: семантический кеш с порогом сходства 0.95 снижает обращения к языковой модели на 68.8%.
from langchain.cache import RedisSemanticCache
from langchain_openai import OpenAIEmbeddings
import langchain
# Redis с векторным поиском (RedisStack)
langchain.llm_cache = RedisSemanticCache(
redis_url='redis://localhost:6379',
embedding=OpenAIEmbeddings(model='text-embedding-3-small'),
score_threshold=0.95 # порог косинусного сходства
)
class ContentAwareSemanticCache:
TTL_MAP = {
'faq': 86400 * 7, # ответы на частые вопросы: 7 дней
'news': 3600, # новости: 1 час
'regulations': 86400, # регламенты: 1 день
'realtime': 0 # данные реального времени: не кешировать
}
def get_ttl(self, query: str, context: str) -> int:
if any(kw in context.lower() for kw in ['новости', 'сегодня', 'только что']):
return self.TTL_MAP['realtime']
if any(kw in context.lower() for kw in ['регламент', 'приказ', 'инструкция']):
return self.TTL_MAP['regulations']
return self.TTL_MAP['faq']
Тонкая настройка скорости: m=16, ef=128 - золотая середина
HNSW (Hierarchical Navigable Small World) - алгоритм, который отвечает за скорость поиска в векторной базе. Два параметра влияют на баланс скорости и точности.
- m - количество связей каждого узла. Больше = точнее, но больше памяти.
- ef - глубина поиска. Больше = точнее, но медленнее.
# Ориентиры: корпус 5 миллионов векторов, 16 процессорных ядер
#
# m=8, ef=64: медиана 4мс, 95-й процентиль 8мс, точность@10 = 92.3%
# m=16, ef=128: медиана 8мс, 95-й процентиль 15мс, точность@10 = 95.8% <-- рекомендуется
# m=32, ef=256: медиана 18мс, 95-й процентиль 32мс, точность@10 = 97.9%
# Параметр ef можно менять без пересборки индекса:
client.update_collection(
collection_name='chunks',
hnsw_config=HnswConfigDiff(ef=256)
)
# m и ef_construction требуют полной пересборки индекса
Для стройфирмы с 5000 документов хватит m=16, ef=128. Не гонитесь за максимальной точностью - 95% достаточно.
Мониторинг: как не пропустить деградацию
Рабочая система без мониторинга - чёрный ящик. Отслеживайте:
- время ответа (цель: 95-й процентиль меньше 2 секунд)
- долю кешированных запросов
- стоимость запроса
- частоту сбоев
Langfuse - платформа для трассировки запросов. Логирует каждый этап: поиск, переупорядочение, генерация.
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
import time
langfuse = Langfuse()
@observe()
async def production_rag_query(query: str, user_id: str = None) -> dict:
start = time.monotonic()
cached = semantic_cache.get(query)
if cached:
langfuse_context.update_current_observation(metadata={'cache_hit': True})
return cached
t0 = time.monotonic()
candidates = await hybrid_retrieve(query, k=20)
retrieval_ms = (time.monotonic() - t0) * 1000
t1 = time.monotonic()
top_chunks = await rerank_results(query, candidates, top_n=5)
rerank_ms = (time.monotonic() - t1) * 1000
t2 = time.monotonic()
response = await generate_response(query, top_chunks)
llm_ms = (time.monotonic() - t2) * 1000
total_ms = (time.monotonic() - start) * 1000
langfuse_context.update_current_observation(metadata={
'retrieval_ms': retrieval_ms,
'rerank_ms': rerank_ms,
'llm_ms': llm_ms,
'total_ms': total_ms,
'cache_hit': False
})
if total_ms > 2000: # превышение целевого времени ответа
alert_ops(f'RAG latency SLA breach: {total_ms:.0f}ms')
result = {'answer': response, 'sources': top_chunks}
semantic_cache.set(query, result)
return result
Типичные ошибки
- Запускать индексирование и обработку запросов в одном процессе. Индексация блокирует ответы.
- Не устанавливать ограничение на число повторных попыток. Бесконечный цикл съедает ресурсы.
- Игнорировать семантическое кеширование. Это самая простая оптимизация с наибольшим эффектом.
- Выбирать максимальные параметры HNSW без замеров. Часто m=16, ef=128 дают 95% точности при вдвое меньшей задержке.
Частые вопросы
Qdrant, pgvector или Weaviate: что выбрать при бюджете $500 в месяц?
pgvector на управляемом PostgreSQL: 5 млн векторов ~30 ГБ, инстанс с 64 ГБ памяти ~$200-250. Задержка 95-го процентиля <30 мс. Qdrant на двух серверах c5.2xlarge: $180, задержка 12-15 мс, лучшая фильтрация. Рекомендация: pgvector если уже используете PostgreSQL, Qdrant если нужна минимальная задержка.
Как сменить модель эмбеддингов без полного перестроения?
Смена модели требует перестройки - векторные пространства несовместимы. Но можно без остановки: создать коллекцию v2 с новой моделью, индексировать в фоне, перенаправить трафик на v2, удалить v1 через 48 часов.
Переупорядочение добавляет 300 мс - стоит ли?
Если цель 2 секунды, а генерация занимает 1.2 секунды - 300 мс вписываются. Своя модель на видеокарте даёт 70-100 мс вместо 300-400 у облачного сервиса.
Как не допустить устаревания кеша при частых обновлениях?
При обновлении документа публикуйте событие в Redis Pub/Sub. Подписчик кеша инвалидирует связанные записи. Для регламентов с редкими обновлениями достаточно TTL на 1 день.
Как изолировать данные разных клиентов в одном кластере?
Три подхода: отдельная коллекция на клиента (полная изоляция, накладные расходы при 1000+ клиентах), фильтр по идентификатору клиента в одном индексе (Qdrant делает это эффективно), пространства имён в Pinecone. До 500 клиентов - фильтр по идентификатору. От 500 - отдельные коллекции.
Что делать прямо сейчас
Разделение потоков индексирования и запросов, гибридный поиск, семантическое кеширование и мониторинг - это четыре кита рабочей системы. Каждый решает конкретную проблему: перегрузку, неполноту поиска, стоимость, невидимость.
Ваш следующий шаг: возьмите Qdrant (бесплатный план на 1 ГБ), подключите OpenAI для эмбеддингов (или бесплатный ruBERT-encoder), настройте семантический кеш через Redis. Всё это настраивается за 2-4 часа. Не нужно нанимать программиста - справится ваш технический менеджер.
Если хотите готовый шаблон промпта и пошаговую инструкцию для вашей ниши - подписывайтесь на канал.
AI Компас (t.me/kosmoslab_ai) - канал для предпринимателей в РФ и СНГ, которые применяют AI в своём бизнесе без программиста. Разбираем инструменты и схемы - без курсов и теории.