Мониторинг наблюдаемости RAG‑конвейеров
Большие языковые модели (LLM) обучаются на огромных объемах данных. Однако они могут иметь определенные ограничения.
- Данные для обучения являются статическими и ограничены датой актуализации знаний.
- Большие языковые модели могут генерировать недостоверные сведения в случаях, когда ответ отсутствует в их базе знаний.
Этот пример иллюстрирует распространенный подход к преодолению этих ограничений, который заключается в использовании конвейера генерации с расширенным поиском (Retrieval-Augmented Generation, RAG) для предоставления LLM дополнительной контекстной информации из авторитетного источника знаний, что приводит к более точным ответам и большему контролю над генерируемым результатом.
Чему вы научитесь?
В этом уроке мы создадим простой API на Python, который использует LangChain для реализации чат-бота, предлагающего туристические направления для вашей следующей поездки.
- Чат-бот представляет собой конвейер RAG, использующий Pinecone для получения релевантной информации для желаемого пользователем места назначения.
- Мы получаем ответ, используя Ollama в качестве линейной модели.
- Мы используем OpenTelemetry для мониторинга кода, чтобы получить подробную информацию о производительности нашего API.
Прежде чем начать
Предварительные требования
- Доступ к вашей среде Kubernetes
- Бесплатный аккаунт Pinecone
Шаги
Общие этапы следующие:
- Создание API-ключей для подключения к Pinecone и Ключ-АСТРОМ.
- Развертка нашего приложение в кластере Kubernetes.
- Визуализация важных сигналов приложения, чтобы отслеживать затраты и качество ответов.
Подробности каждого этапа см. ниже.
Подготовка ключей API
На этом этапе мы создадим два API-ключа и сохраним их в качестве секретов Kubernetes. API-ключи будут использоваться для подключения к Ключ-АСТРОМ и Pinecone.
Создание токена Ключ-АСТРОМ
Для создания токена Ключ-АСТРОМ
- В Ключ-АСТРОМ перейдите в раздел Токены доступа.
Чтобы найти Токены доступа, нажмите CTRL+K для поиска и выберите Токены доступа. - В разделе Токены доступа выберите Сгенерировать новый токен.
- Введите имя для вашего нового токена.
- Предоставьте своему новому токену следующие права доступа:
- Найдите и выберите все следующие области применения.
- Метрики приема (
metrics.ingest) - Логи загрузки (
logs.ingest) - События приема (
events.ingest) - Приём трассировок OpenTelemetry (
openTelemetryTrace.ingest) - Чтение метрик (
metrics.read) - Настройки записи (
settings.write)
- Метрики приема (
- Выберите Сгенерировать токен.
- Скопируйте сгенерированный токен в буфер обмена. Сохраните токен в менеджере паролей для дальнейшего использования.
| Вы можете получить доступ к своему токену только один раз при его создании. После этого вы не сможете его раскрыть. |
Сохранение ключа API в качестве секрета Kubernetes
Теперь, когда у вас есть токен с необходимыми правами доступа, вы можете использовать следующую команду для сохранения ключа API Ключ-АСТРОМ в качестве секрета Kubernetes. Наше приложение на Python будет использовать его для отправки данных мониторинга вашему клиенту.
| kubectl create secret generic astromkey --from-literal token=<your-api-key> -n travel-advisor |
Если ошибка вызвана отсутствием пространства имен
Если команда возвращает ошибку из-за отсутствия пространства имен, вы можете создать его, выполнив команду kubectl create namespace travel-advisor.
Подключение к Pinecone
Для подключения к Pinecone
- Создайте новый индекс
travel-advisorс размерами 3200 и метрикойcosine.
Индекс будет хранить наш источник знаний, который конвейер RAG будет использовать для расширения выходных данных LLM, касающихся рекомендаций по путешествиям. Обоснование выбора параметров будет обсуждаться далее, в разделе развертывания . - После создания и запуска индекса мы можем создать ключ API для подключения.
Следуйте инструкциям в документации Pinecone по аутентификации, чтобы получить ключ API для подключения к вашему индексу Pinecone и сохранить его в качестве секретов Kubernetes с помощью следующей команды:
| kubectl create secret generic pinecone --from-literal api-key=<your-api-key> -n travel-advisor |
Создание и развертка конвейера RAG
В нашем демонстрационном приложении мы используем Ollama для генерации ответа.
Мы также используем Ollama для эмбеддинга — процесса преобразования текста в векторы, которые отражают его семантическое значение. Эмбеддинг позволяет представить источник знаний в виде математических объектов, хранящихся в индексе Pinecone. Аналогичным образом мы можем обработать ввод пользователя, найти схожие векторы в индексе Pinecone и предоставить дополнительной информации большой языковой модели (LLM) для формирования итогового ответа.
1. Создайте новое пространство имен Kubernetes с именем ollama.
| apiVersion: v1
kind: Namespace metadata: name: ollama labels: name: ollama |
2. Разверните последнюю версию Ollama, работающую на порту 11434.
По умолчанию контейнер Ollama не содержит никакой модели. Нам необходимо указать контейнеру, какую модель следует загрузить, и сделать её доступной через его API. Для этого мы указываем в Kubernetes команду ollama run orca-mini:3b после запуска контейнера, которая, в свою очередь, указывает Ollama загрузить модель LLM orca-mini:3b.
| apiVersion: apps/v1
kind: Deployment metadata: name: ollama namespace: ollama spec: selector: matchLabels: name: ollama template: metadata: labels: name: ollama spec: containers: - name: ollama image: ollama/ollama:latest ports: - name: http containerPort: 11434 protocol: TCP lifecycle: postStart: exec: command: [ "/bin/sh", "-c", "ollama run orca-mini:3b" ] |
3. Укажите Kubernetes, чтобы он предоставлял доступ к этим API другим контейнерам через сервис http://ollama.ollama.
| apiVersion: v1
kind: Service metadata: name: ollama namespace: ollama spec: type: ClusterIP selector: name: ollama ports: - port: 80 name: http targetPort: 11434 protocol: TCP |
4. Теперь, когда Ollama запущен, мы можем создать наш конвейер LangChain RAG с помощью некоторого кода на Python.
Создайте объект, который будет использоваться для связи с Ollama для выполнения этапа встраивания. Ollama возвращает векторы orca-mini:3b фиксированного размера 3200. Именно поэтому мы установили свойство dimensions нашего индекса Pinecone равным 3200.
| from langchain_community.embeddings import OllamaEmbeddings
embeddings = OllamaEmbeddings(model="orca-mini:3b", base_url="http://ollama.ollama") |
5. Загрузите документы из локальной файловой системы.
В данном примере у нас имеется несколько HTML‑страниц с рекомендациями о том, какие места стоит посетить в этих городах.
| # Retrieve the source data
docs_list = [] for item in os.listdir(path="destinations"): if item.endswith(".html"): item_docs_list = BSHTMLLoader(file_path=f"destinations/{item}").load() for item in item_docs_list: docs_list.append(item) |
6. Разбейте документы на фрагменты.
Разделение текста на фрагменты важно, поскольку большие языковые модели (LLM) имеют известное ограничение — окно контекста (context window), определяющее границы объёма текста, который модель способна обработать и осмыслить. Разбиение документов на фрагменты позволяет LLM‑модели эффективно анализировать их содержимое и использовать его для формирования ответа.
| # Split Document into tokens
text_splitter = RecursiveCharacterTextSplitter() documents = text_splitter.split_documents(docs_list) logger.info("Loading documents from PineCone...") vector = PineconeVectorStore.from_documents( documents, index_name="travel-advisor", # PineCone index to use embedding=embeddings # we're telling LangChain to use Ollama for the embedding step ) retriever = vector.as_retriever() |
7. Инициализируйте модель LLM и оформите ввод пользователя в виде промпта, который задаёт ожидаемый от модели ответ.
Мы используем шаблон с двумя переменными:
input: это поле заполняется текстом, введенным пользователем.context: этот раздел содержит актуальную информацию, полученную из индекса Pinecone.
| llm = ChatOllama(model="orca-mini:3b", base_url="http://ollama.ollama")
prompt = ChatPromptTemplate.from_template(""" 1. Use the following pieces of context to answer the question as travel advise at the end. 2. Keep the answer crisp and limited to 3,4 sentences. Context: {context} Question: {input} Helpful Answer:""") document_prompt = PromptTemplate( input_variables=["page_content", "source"], template="content:{page_content}\nsource:{source}", ) |
8. Соберите все воедино и создайте наш конвейер RAG.
Процесс конвейера выполняет следующие шаги:
- Обращается к Ollama для создания векторного эмбеддинга ввода пользователя.
- Обращается к Pinecone для поиска релевантных документов на основе полученного векторного эмбеддинга.
- Обращается к Ollama для генерации ответа туристического консультанта с использованием промпта, содержащего контекст, соответствующий вводу пользователя.
| document_chain = create_stuff_documents_chain(
llm=llm, prompt=prompt, document_prompt=document_prompt, ) chain = create_retrieval_chain(retriever, document_chain) response = chain.invoke({"input": prompt}) |
| Полный пример кода |
|---|
| from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_community.document_loaders import BSHTMLLoader from langchain_community.chat_models import ChatOllama from langchain_community.embeddings import OllamaEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain.chains.combine_documents import create_stuff_documents_chain from langchain.chains import create_retrieval_chain from langchain_pinecone import PineconeVectorStore import logging import os from fastapi import FastAPI from fastapi.staticfiles import StaticFiles import uvicorn from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from traceloop.sdk import Traceloop from traceloop.sdk.decorators import workflow from telemetry.token_count import TokenUsageCallbackHandler from telemetry.langchain import LangchainInstrumentor # Read secrets from the mounted volume def read_token(): return read_secret('token') def read_pinecone_key(): return read_secret('api-key') def read_secret(secret: str): try: with open(f"/etc/secrets/{secret}", "r") as f: return f.read().rstrip() except Exception as e: print("No token was provided") print(e) return "" # Expose the PineCone key as env var for initialization by LangChain os.environ['PINECONE_API_KEY'] = read_pinecone_key() OTEL_ENDPOINT = os.environ.get("OTEL_ENDPOINT", "http://localhost:4317") OLLAMA_ENDPOINT = os.environ.get("OLLAMA_ENDPOINT", "http://localhost:11434") # GLOBALS AI_MODEL = os.environ.get("AI_MODEL", "orca-mini:3b") AI_SYSTEM = "ollama" AI_EMBEDDING_MODEL = os.environ.get("AI_EMBEDDING_MODEL", "orca-mini:3b") MAX_PROMPT_LENGTH = 50 retrieval_chain = None # Initialise the logger logging.basicConfig(level=logging.INFO, filename="run.log") logger = logging.getLogger(__name__) # ################ # # CONFIGURE OPENTELEMETRY resource = Resource.create({ "service.name": "travel-advisor", "service.version": "0.1.0" }) TOKEN = read_token() headers = { "Authorization": f"Api-Token {TOKEN}" } provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=f"{OTEL_ENDPOINT}", headers=headers)) provider.add_span_processor(processor) trace.set_tracer_provider(provider) otel_tracer = trace.get_tracer("travel-advisor") Traceloop.init(app_name="travel-advisor", api_endpoint=OTEL_ENDPOINT, disable_batch=True, headers=headers) def prep_system(): # Create the embedding embeddings = OllamaEmbeddings(model=AI_EMBEDDING_MODEL, base_url=OLLAMA_ENDPOINT) # Retrieve the source data docs_list = [] for item in os.listdir(path="destinations"): if item.endswith(".html"): item_docs_list = BSHTMLLoader(file_path=f"destinations/{item}").load() for item in item_docs_list: docs_list.append(item) # Split Document into tokens text_splitter = RecursiveCharacterTextSplitter() documents = text_splitter.split_documents(docs_list) logger.info("Loading documents from PineCone...") vector = PineconeVectorStore.from_documents( documents, index_name="travel-advisor", embedding=embeddings ) retriever = vector.as_retriever() logger.info("Initialising Llama LLM...") llm = ChatOllama(model=AI_MODEL, base_url=OLLAMA_ENDPOINT) prompt = ChatPromptTemplate.from_template(""" 1. Use the following pieces of context to answer the question as travel advise at the end. 2. Keep the answer crisp and limited to 3,4 sentences. Context: {context} Question: {input} Helpful Answer:""") document_prompt = PromptTemplate( input_variables=["page_content", "source"], template="content:{page_content}\nsource:{source}", ) document_chain = create_stuff_documents_chain( llm=llm, prompt=prompt, document_prompt=document_prompt, ) return create_retrieval_chain(retriever, document_chain) ############ # CONFIGURE ENDPOINTS app = FastAPI() #################################### @app.get("/api/v1/completion") def submit_completion(prompt: str): with otel_tracer.start_as_current_span(name="/api/v1/completion") as span: return submit_completion(prompt, span) @workflow(name="travelgenerator") def submit_completion(prompt: str, span): if prompt: logger.info(f"Calling RAG to get the answer to the question: {prompt}...") response = retrieval_chain.invoke({"input": prompt}, config={ "callbacks": [TokenUsageCallbackHandler()], }) # Log information for DQL to grab logger.info(f"Response: {response}. Using RAG. model={AI_MODEL}. prompt={prompt}") return {"message": response['answer']} else: # No, or invalid prompt given span.add_event(f"No prompt provided or prompt too long (over {MAX_PROMPT_LENGTH} chars)") return {"message": f"No prompt provided or prompt too long (over {MAX_PROMPT_LENGTH} chars)"} #################################### @app.get("/api/v1/thumbsUp") @otel_tracer.start_as_current_span("/api/v1/thumbsUp") def thumbs_up(prompt: str): logger.info(f"Positive user feedback for search term: {prompt}") @app.get("/api/v1/thumbsDown") @otel_tracer.start_as_current_span("/api/v1/thumbsDown") def thumbs_down(prompt: str): logger.info(f"Negative user feedback for search term: {prompt}") if __name__ == "__main__": retrieval_chain = prep_system() # Mount static files at the root app.mount("/", StaticFiles(directory="./public", html=True), name="public") #app.mount("/destinations", StaticFiles(directory="destinations", html = True), name="destinations") # Run the app using uvicorn uvicorn.run(app, host="0.0.0.0", port=8080) |
| Пример манифеста развертки Kubernetes |
|---|
| ---
apiVersion: v1 kind: Namespace metadata: name: travel-advisor labels: name: travel-advisor --- apiVersion: apps/v1 kind: Deployment metadata: name: travel-advisor namespace: travel-advisor spec: selector: matchLabels: name: travel-advisor template: metadata: labels: name: travel-advisor spec: containers: - name: travel-advisor image: travel-advisor:v0.1.3 ports: - name: http containerPort: 8080 protocol: TCP env: - name: OTEL_ENDPOINT value: "https://<YOUR_ENV>.live.astromkey.com/api/v2/otlp" - name: OLLAMA_ENDPOINT value: "http://ollama.ollama" - name: TRACELOOP_TELEMETRY value: "false" imagePullPolicy: Always volumeMounts: - name: secrets readOnly: true mountPath: "/etc/secrets" volumes: - name: secrets projected: sources: - secret: name: astromkey - secret: name: pinecone --- apiVersion: v1 kind: Service metadata: name: travel-advisor namespace: travel-advisor spec: type: LoadBalancer selector: name: travel-advisor ports: - port: 80 name: http targetPort: 8080 protocol: TCP |
Проанализируйте свой конвейер RAG
В данном примере мы представили упрощённый RAG‑конвейер, однако он уже включает интенсивное взаимодействие с внешними сервисами. Комплексная наблюдаемость (observability) обязательна для контроля производительности, затрат и качества ответов, предоставляемых большой языковой моделью (LLM).
К счастью, нам не требуется вручную инструментировать кодовую базу и собирать ключевые сигналы. Мы можем использовать OpenTelemetry для получения трейсов и метрик — в частности, OpenLLMetry.
1. Добавьте следующую строку в наш код, и мы сможем использовать возможности Ключ-АСТРОМ для мониторинга наших рабочих нагрузок в области искусственного интеллекта.
| headers = { "Authorization": "Api-Token <YOUR_DT_API_TOKEN>" }
Traceloop.init( app_name="travel-advisor", api_endpoint="https://<YOUR_ENV>.live.astromkey.com/api/v2/otlp", disable_batch=True, headers=headers ) |
2. Теперь мы можем просматривать трассировки, описывающие каждый шаг, выполняемый конвейером LangChain RAG, и выявлять узкие места, предлагать улучшения или отслеживать, если сервис больше недоступен.
Однако одной трассировки недостаточно для оценки работоспособности наших AI‑нагрузок. Для этого мы можем настроить дашборд, отображающие ключевые метрики сервисов. Например, можно отслеживать количество входных/выходных токенов, задержку сервисов или настроить SLO (цели уровня обслуживания) при достижении порогового значения потребления токенов.
OpenTelemetry предлагает семантическую конвенцию GenAI (GenAI Semantic Convention), которую можно использовать для написания DQL‑запросов и визуализации значимых сигналов AI‑нагрузок. Соответствующие атрибуты в этой области начинаются с префикса gen_ai.
Например, мы можем вывести список названий используемых моделей.
| fetch spans
| summarize models = collectDistinct(gen_ai.request.model) | expand models | sort models |
| Пример оповещения о превышении лимита обслуживания (SLO) на основе потребления токенов |
|---|
| fetch spans
| filter gen_ai.response.model == "orca-mini:3b" | makeTimeseries total = max(gen_ai.usage.output_tokens + gen_ai.usage.input_tokens), baseline = avg(gen_ai.usage.output_tokens + gen_ai.usage.input_tokens) | fieldsAdd sli = (baseline[]/total[])*100 | fieldsRemove baseline, total |
| Пример построения графика времени отклика для различных типов вызовов LLM |
|---|
| fetch spans
| filter gen_ai.request.model == "orca-mini:3b" and llm.request.type != "" | fieldsKeep duration, gen_ai.request.model, llm.request.type, end_time | makeTimeseries avg(duration), time: end_time, by: {llm.request.type} | append [ fetch spans | filter gen_ai.request.model == "orca-mini:3b" and llm.request.type != "" | makeTimeseries requests=count() ] |
| Пример сопоставления входных данных и подсказок для ответа |
|---|
| fetch spans
| filter gen_ai.request.model == "orca-mini:3b" and llm.request.type == "chat" | fieldsAdd prompt = gen_ai.prompt.0.content | fieldsAdd response = gen_ai.completion.0.content | fields prompt, response |