diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 82eddab..7c6116c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -27,4 +27,4 @@ repos: language: system types: [python] pass_filenames: false - stages: [pre-push] + stages: [pre-commit] diff --git a/app/api/dependencies.py b/app/api/dependencies.py new file mode 100644 index 0000000..1449f03 --- /dev/null +++ b/app/api/dependencies.py @@ -0,0 +1,112 @@ +from functools import lru_cache + +from fastapi import Depends + +from app.core.config import Settings, get_settings +from app.services.cache_service import CacheService +from app.services.category_manager import CategoryManager +from app.services.chat_memory_service import ChatMemoryService +from app.services.guardrails_service import GuardrailsService +from app.services.lead_service import LeadService +from app.services.openai_service import OpenAIService +from app.services.price_comparator import PriceComparator +from app.services.rag_engine import RAGEngine +from app.services.scraper_service import ScraperService +from app.services.telegram_service import TelegramService +from app.services.vector_service import VectorService +from app.services.woo_service import WooService + + +@lru_cache(maxsize=1) +def get_openai_service() -> OpenAIService: + """Provide a singleton instance of OpenAIService.""" + return OpenAIService(get_settings()) + + +@lru_cache(maxsize=1) +def get_vector_service() -> VectorService: + """Provide a singleton instance of VectorService.""" + return VectorService(get_settings()) + + +@lru_cache(maxsize=1) +def get_woo_service() -> WooService: + """Provide a singleton instance of WooService.""" + return WooService(get_settings()) + + +@lru_cache(maxsize=1) +def get_scraper_service() -> ScraperService: + """Provide a singleton instance of ScraperService.""" + return ScraperService(get_settings()) + + +@lru_cache(maxsize=1) +def get_cache_service() -> CacheService: + """Provide a singleton instance of CacheService.""" + return CacheService(get_settings()) + + +@lru_cache(maxsize=1) +def get_telegram_service() -> TelegramService: + """Provide a singleton instance of TelegramService.""" + return TelegramService(get_settings()) + + +@lru_cache(maxsize=1) +def get_category_manager() -> CategoryManager: + """Provide a singleton instance of CategoryManager.""" + return CategoryManager() + + +@lru_cache(maxsize=1) +def get_guardrails_service() -> GuardrailsService: + """Provide a singleton instance of GuardrailsService.""" + return GuardrailsService() + + +@lru_cache(maxsize=1) +def get_chat_memory_service() -> ChatMemoryService: + """Provide a singleton instance of ChatMemoryService.""" + return ChatMemoryService() + + +def get_lead_service( + telegram_service: TelegramService = Depends(get_telegram_service), + chat_memory_service: ChatMemoryService = Depends(get_chat_memory_service), +) -> LeadService: + """Provide a LeadService instance.""" + return LeadService(telegram_service, chat_memory_service) + + +def get_price_comparator( + woo_service: WooService = Depends(get_woo_service), + scraper_service: ScraperService = Depends(get_scraper_service), + cache_service: CacheService = Depends(get_cache_service), + settings: Settings = Depends(get_settings), +) -> PriceComparator: + """Provide a PriceComparator instance constructed from its dependencies.""" + return PriceComparator(woo_service, scraper_service, cache_service, settings) + + +def get_rag_engine( + openai_service: OpenAIService = Depends(get_openai_service), + vector_service: VectorService = Depends(get_vector_service), + price_comparator: PriceComparator = Depends(get_price_comparator), + telegram_service: TelegramService = Depends(get_telegram_service), + category_manager: CategoryManager = Depends(get_category_manager), + guardrails_service: GuardrailsService = Depends(get_guardrails_service), + chat_memory_service: ChatMemoryService = Depends(get_chat_memory_service), + settings: Settings = Depends(get_settings), +) -> RAGEngine: + """Dependency injection for RAGEngine.""" + return RAGEngine( + openai_service=openai_service, + vector_service=vector_service, + price_comparator=price_comparator, + telegram_service=telegram_service, + category_manager=category_manager, + guardrails_service=guardrails_service, + chat_memory_service=chat_memory_service, + settings=settings, + ) diff --git a/app/api/v1/endpoints/chat.py b/app/api/v1/endpoints/chat.py index 179983c..bf85baa 100644 --- a/app/api/v1/endpoints/chat.py +++ b/app/api/v1/endpoints/chat.py @@ -1,68 +1,23 @@ import uuid from collections.abc import AsyncGenerator -from functools import lru_cache from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse -from app.core.config import get_settings +from app.api.dependencies import get_rag_engine from app.core.constants import MSG_STREAM_CHAT_FAILED, MSG_SYNC_CHAT_FAILED from app.core.logging_config import get_logger from app.core.metrics import bot_messages_total from app.core.security import verify_api_key from app.middleware.rate_limiter import limiter from app.schemas.chat import ChatRequest, ChatResponse -from app.services.cache_service import CacheService -from app.services.category_manager import CategoryManager -from app.services.chat_memory_service import ChatMemoryService -from app.services.guardrails_service import GuardrailsService -from app.services.openai_service import OpenAIService -from app.services.price_comparator import PriceComparator from app.services.rag_engine import RAGEngine -from app.services.scraper_service import ScraperService -from app.services.telegram_service import TelegramService -from app.services.vector_service import VectorService -from app.services.woo_service import WooService from app.utils.network import get_client_ip logger = get_logger(__name__) chat_router = APIRouter() -@lru_cache -def get_cached_rag_engine() -> RAGEngine: - """Get cached instance of RAG engine.""" - settings = get_settings() - openai_service = OpenAIService(settings) - vector_service = VectorService(settings) - - woo_service = WooService(settings) - scraper_service = ScraperService(settings) - cache_service = CacheService(settings) - - price_comparator = PriceComparator(woo_service, scraper_service, cache_service, settings) - telegram_service = TelegramService(settings) - category_manager = CategoryManager() - guardrails_service = GuardrailsService() - chat_memory_service = ChatMemoryService() - - return RAGEngine( - openai_service=openai_service, - vector_service=vector_service, - price_comparator=price_comparator, - telegram_service=telegram_service, - category_manager=category_manager, - guardrails_service=guardrails_service, - chat_memory_service=chat_memory_service, - settings=settings, - ) - - -def get_rag_engine() -> RAGEngine: - """Dependency injection for RAGEngine.""" - return get_cached_rag_engine() - - @chat_router.post("", response_model=ChatResponse) @limiter.limit("20/minute") # type: ignore async def chat_completion( diff --git a/app/api/v1/endpoints/health.py b/app/api/v1/endpoints/health.py index ba9ca30..ad63ae5 100644 --- a/app/api/v1/endpoints/health.py +++ b/app/api/v1/endpoints/health.py @@ -1,9 +1,11 @@ -from fastapi import APIRouter, HTTPException, status -from openai import AsyncOpenAI -from pinecone import Pinecone # type: ignore[reportMissingTypeStubs] +import asyncio -from app.core.config import get_settings +from fastapi import APIRouter, Depends, HTTPException, status + +from app.api.dependencies import get_openai_service, get_vector_service from app.core.logging_config import get_logger +from app.services.openai_service import OpenAIService +from app.services.vector_service import VectorService logger = get_logger(__name__) health_router = APIRouter() @@ -16,21 +18,18 @@ async def health_check() -> dict[str, str]: @health_router.get("/ready", status_code=status.HTTP_200_OK) -async def readiness_check() -> dict[str, str]: +async def readiness_check( + openai_service: OpenAIService = Depends(get_openai_service), + vector_service: VectorService = Depends(get_vector_service), +) -> dict[str, str]: """Check availability of dependencies: Pinecone and OpenAI (Readiness probe).""" - settings = get_settings() components_status = {"status": "ready", "pinecone": "ok", "openai": "ok"} errors: list[str] = [] # Check OpenAI try: - openai_client = AsyncOpenAI( - api_key=settings.openai_api_key.get_secret_value(), - timeout=5.0, - max_retries=0, - ) # Call model listing as lightweight check - await openai_client.models.list() + await openai_service.client.models.list() except Exception as e: logger.warning("OpenAI readiness check failed", error=str(e)) components_status["openai"] = "failed" @@ -38,9 +37,8 @@ async def readiness_check() -> dict[str, str]: # Check Pinecone try: - pinecone_client = Pinecone(api_key=settings.pinecone_api_key.get_secret_value()) # Check service access (listing indices) - pinecone_client.list_indexes() + await asyncio.to_thread(vector_service.pc.list_indexes) except Exception as e: logger.warning("Pinecone readiness check failed", error=str(e)) components_status["pinecone"] = "failed" diff --git a/app/api/v1/endpoints/ingest.py b/app/api/v1/endpoints/ingest.py index f3c52da..4699045 100644 --- a/app/api/v1/endpoints/ingest.py +++ b/app/api/v1/endpoints/ingest.py @@ -2,7 +2,7 @@ from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile, status -from app.core.config import Settings, get_settings +from app.api.dependencies import get_openai_service, get_vector_service from app.core.logging_config import get_logger from app.core.security import verify_api_key from app.middleware.rate_limiter import limiter @@ -18,14 +18,6 @@ MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB -def get_openai_service(settings: Settings = Depends(get_settings)) -> OpenAIService: - return OpenAIService(settings) - - -def get_vector_service(settings: Settings = Depends(get_settings)) -> VectorService: - return VectorService(settings) - - @ingest_router.post("/document", response_model=IngestResponse) @limiter.limit("5/minute") # type: ignore[reportUntypedFunctionDecorator, reportUnknownMemberType] async def upload_document( @@ -35,6 +27,7 @@ async def upload_document( openai_service: OpenAIService = Depends(get_openai_service), vector_service: VectorService = Depends(get_vector_service), ) -> IngestResponse: + """Upload a document for ingestion into the RAG vector store.""" filename = file.filename or "unknown" logger.info("Starting document ingestion", filename=filename) @@ -72,7 +65,7 @@ async def upload_document( return IngestResponse(document_id=document_id, chunks_count=len(chunks), status="indexed") except Exception as e: - logger.error("Ingestion pipeline failed", error=str(e), document_id=document_id) + logger.exception("Ingestion pipeline failed", document_id=document_id) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to process document" ) from e @@ -87,6 +80,7 @@ async def upload_text( openai_service: OpenAIService = Depends(get_openai_service), vector_service: VectorService = Depends(get_vector_service), ) -> IngestResponse: + """Ingest raw text into the RAG vector store.""" logger.info("Starting text ingestion") document_id = generate_document_id("raw_text") chunks = chunk_text(payload.text) @@ -113,7 +107,7 @@ async def upload_text( return IngestResponse(document_id=document_id, chunks_count=len(chunks), status="indexed") except Exception as e: - logger.error("Text ingestion failed", error=str(e), document_id=document_id) + logger.exception("Text ingestion failed", document_id=document_id) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to process text" ) from e diff --git a/app/api/v1/endpoints/leads.py b/app/api/v1/endpoints/leads.py index 56a5cac..1a8a8b1 100644 --- a/app/api/v1/endpoints/leads.py +++ b/app/api/v1/endpoints/leads.py @@ -1,132 +1,21 @@ -import re -from typing import Any -from urllib.parse import urlparse, urlunparse +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status -import httpx -import sentry_sdk -from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, status -from tenacity import ( - RetryError, - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) - -from app.core.config import get_settings -from app.core.constants import ( - ALERT_BOT_LEAD, - ALERT_HOT_LEAD, - BTN_DECLINE, - BTN_IN_PROGRESS, - BTN_PRODUCT_LINK, - BTN_SUCCESS, - MAX_PAYLOAD_SIZE, -) -from app.core.db import AsyncSessionLocal, commit_with_retry -from app.core.metrics import leads_created_total +from app.api.dependencies import get_lead_service +from app.core.constants import MAX_PAYLOAD_SIZE from app.middleware.rate_limiter import limiter -from app.models.lead import Lead from app.schemas.lead import ContactFormLead -from app.services.chat_memory_service import ChatMemoryService -from app.services.telegram_service import TelegramService +from app.services.lead_service import LeadService leads_router = APIRouter() -@retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=2, max=10), - retry=retry_if_exception_type(httpx.RequestError), -) -async def send_telegram_notification( - lead_id: int, - message: str, - alert_type: str, - reply_markup: dict[str, Any] | None = None, - session_id: str | None = None, -) -> None: - """Send notification to Telegram with retry mechanism.""" - settings = get_settings() - telegram_service = TelegramService(settings) - await telegram_service.send_alert( - message, alert_type=alert_type, reply_markup=reply_markup, session_id=session_id - ) - - -async def process_lead_background( - lead_id: int, message: str, alert_type: str = "lead", session_id: str | None = None -) -> None: - """Process lead in background and send Telegram notification.""" - reply_markup = { - "inline_keyboard": [ - [ - {"text": BTN_SUCCESS, "callback_data": f"lead_status:{lead_id}:success"}, - {"text": BTN_DECLINE, "callback_data": f"lead_status:{lead_id}:decline"}, - ], - [{"text": BTN_IN_PROGRESS, "callback_data": f"lead_status:{lead_id}:in_progress"}], - ] - } - async with AsyncSessionLocal() as session: - try: - if session_id: - chat_memory_service = ChatMemoryService() - history = await chat_memory_service.get_history(session_id, limit=10) - if history: - bot_msgs = [m["content"] for m in history if m["role"] == "bot"] - product_url = None - if bot_msgs: - for msg in reversed(bot_msgs): - match = re.search(r"", msg) - if match: - raw_url = match.group(1) - parsed = urlparse(raw_url) - product_url = urlunparse( - ( - parsed.scheme, - parsed.netloc, - parsed.path, - parsed.params, - "", - parsed.fragment, - ) - ) - break - - if product_url: - reply_markup["inline_keyboard"].insert( - 0, [{"text": BTN_PRODUCT_LINK, "url": product_url}] - ) - - await send_telegram_notification( - lead_id, message, alert_type, reply_markup=reply_markup, session_id=session_id - ) - - # Update status to sent - lead = await session.get(Lead, lead_id) - if lead: - lead.notification_status = "sent" # type: ignore - await commit_with_retry(session) - except RetryError: - # If all attempts are exhausted - sentry_sdk.capture_message(f"Telegram API failed for lead_id={lead_id}", level="error") - lead = await session.get(Lead, lead_id) - if lead: - lead.notification_status = "failed" # type: ignore - await commit_with_retry(session) - except Exception as e: - sentry_sdk.capture_message( - f"Unexpected error for lead_id={lead_id}: {e}", level="error" - ) - lead = await session.get(Lead, lead_id) - if lead: - lead.notification_status = "failed" # type: ignore - await commit_with_retry(session) - - @leads_router.post("", status_code=status.HTTP_201_CREATED) @limiter.limit("3/minute") # type: ignore -async def create_lead(request: Request, background_tasks: BackgroundTasks) -> dict[str, str]: +async def create_lead( + request: Request, + background_tasks: BackgroundTasks, + lead_service: LeadService = Depends(get_lead_service), +) -> dict[str, str]: """Create a new lead from contact form or checkout.""" content_length = request.headers.get("content-length") if content_length and int(content_length) > MAX_PAYLOAD_SIZE: @@ -154,57 +43,12 @@ async def create_lead(request: Request, background_tasks: BackgroundTasks) -> di if lead_data.honeypot: return {"status": "success", "message": "Lead received"} - # Asynchronous write to DB - async with AsyncSessionLocal() as session: - db_lead = Lead( - name=lead_data.name, - surname=lead_data.surname, - phone_number=lead_data.phone_number, - contact_method=lead_data.contact_method, - lead_type=lead_data.lead_type, - delivery_address=lead_data.delivery_address, - notification_status="pending", - session_id=lead_data.session_id, - ) - session.add(db_lead) - await commit_with_retry(session) - await session.refresh(db_lead) - lead_id = int(db_lead.id) # type: ignore - - from datetime import datetime - from zoneinfo import ZoneInfo - - tz = ZoneInfo("Europe/Kyiv") - now_str = datetime.now(tz).strftime("%d.%m.%Y %H:%M") client_ip = request.client.host if request.client else "Unknown" - if lead_data.lead_type == "checkout": - message = ALERT_HOT_LEAD.format( - lead_id=lead_id, - now_str=now_str, - name=lead_data.name, - surname=lead_data.surname or "", - phone=lead_data.phone_number, - method=lead_data.contact_method, - address=lead_data.delivery_address or "Не вказана", - ip=client_ip, - ) - leads_created_total.labels(type="checkout", status="success").inc() - background_tasks.add_task( - process_lead_background, lead_id, message, "hot_lead", lead_data.session_id - ) - else: - message = ALERT_BOT_LEAD.format( - lead_id=lead_id, - now_str=now_str, - name=lead_data.name, - phone=lead_data.phone_number, - method=lead_data.contact_method, - ip=client_ip, - ) - leads_created_total.labels(type="contact", status="success").inc() - background_tasks.add_task( - process_lead_background, lead_id, message, "lead", lead_data.session_id - ) + lead_id, message, alert_type = await lead_service.create_contact_lead(lead_data, client_ip) + + background_tasks.add_task( + lead_service.process_lead_background, lead_id, message, alert_type, lead_data.session_id + ) return {"status": "success", "message": "Lead received"} diff --git a/app/api/v1/endpoints/telegram.py b/app/api/v1/endpoints/telegram.py index a1495a4..404f784 100644 --- a/app/api/v1/endpoints/telegram.py +++ b/app/api/v1/endpoints/telegram.py @@ -1,7 +1,6 @@ import logging from typing import Any -import httpx from fastapi import APIRouter, BackgroundTasks, Request, status from app.core.config import get_settings @@ -15,79 +14,71 @@ telegram_router = APIRouter() -async def answer_callback_query(callback_query_id: str, token: str, text: str = "") -> None: - """Answer a Telegram callback query.""" - url = f"https://api.telegram.org/bot{token}/answerCallbackQuery" - payload = {"callback_query_id": callback_query_id, "text": text} - async with httpx.AsyncClient() as client: - try: - await client.post(url, json=payload) - except Exception as e: - logger.error(f"Failed to answer callback query: {e}") - - async def process_telegram_update(update: dict[str, Any]) -> None: """Process incoming Telegram update.""" - if "callback_query" not in update: - return - - callback_query = update["callback_query"] - callback_id = callback_query.get("id") - data = callback_query.get("data", "") - message = callback_query.get("message", {}) - message_id = message.get("message_id") - chat_id = message.get("chat", {}).get("id") - original_text = message.get("text", "") or message.get("caption", "") - - settings = get_settings() - telegram_service = TelegramService(settings) - - if callback_id and settings.telegram_bot_token: - await answer_callback_query(callback_id, settings.telegram_bot_token) - - if data.startswith("lead_status:"): - parts = data.split(":") - if len(parts) == 3: - _, lead_id_str, new_status = parts - try: - lead_id = int(lead_id_str) - except ValueError: - return - - status_map = { - "success": BTN_SUCCESS, - "decline": BTN_DECLINE, - "in_progress": BTN_IN_PROGRESS, - } - status_text = status_map.get(new_status, new_status) - - # Update DB - async with AsyncSessionLocal() as session: - lead = await session.get(Lead, lead_id) - if lead: - lead.status = new_status # type: ignore - await commit_with_retry(session) - - # Update message (remove buttons, add status to end of text) - if original_text and message_id and chat_id: - # Since Telegram returns text without HTML tags, it's better to just append the string to the end. - # But if we lose formatting, it could be a problem. - # Simplest way: leave the message, but update ReplyMarkup to a single "Status: ..." button or remove buttons. - - # Leave unclickable button or update text - new_markup = { - "inline_keyboard": [ - [ - { - "text": BTN_STATUS.format(status_text=status_text), - "callback_data": "ignore", - } - ] - ] + try: + if "callback_query" not in update: + return + + callback_query = update["callback_query"] + callback_id = callback_query.get("id") + data = callback_query.get("data", "") + message = callback_query.get("message", {}) + message_id = message.get("message_id") + chat_id = message.get("chat", {}).get("id") + original_text = message.get("text", "") or message.get("caption", "") + + settings = get_settings() + telegram_service = TelegramService(settings) + + if callback_id and settings.telegram_bot_token: + await telegram_service.answer_callback_query(callback_id) + + if data.startswith("lead_status:"): + parts = data.split(":") + if len(parts) == 3: + _, lead_id_str, new_status = parts + try: + lead_id = int(lead_id_str) + except ValueError: + return + + status_map = { + "success": BTN_SUCCESS, + "decline": BTN_DECLINE, + "in_progress": BTN_IN_PROGRESS, } - await telegram_service.update_message_reply_markup( - message_id=message_id, reply_markup=new_markup, chat_id=chat_id - ) + status_text = status_map.get(new_status, new_status) + + # Update DB + async with AsyncSessionLocal() as session: + lead = await session.get(Lead, lead_id) + if lead: + lead.status = new_status # type: ignore + await commit_with_retry(session) + + # Update message (remove buttons, add status to end of text) + if original_text and message_id and chat_id: + # Since Telegram returns text without HTML tags, it's better to just append the string to the end. + # But if we lose formatting, it could be a problem. + # Simplest way: leave the message, but update ReplyMarkup to a single "Status: ..." button or remove buttons. + + # Leave unclickable button or update text + new_markup = { + "inline_keyboard": [ + [ + { + "text": BTN_STATUS.format(status_text=status_text), + "callback_data": "ignore", + } + ] + ] + } + await telegram_service.update_message_reply_markup( + message_id=message_id, reply_markup=new_markup, chat_id=chat_id + ) + except Exception: + logger.exception("Error processing telegram update") @telegram_router.post("/webhook", status_code=status.HTTP_200_OK) @@ -96,7 +87,7 @@ async def telegram_webhook(request: Request, background_tasks: BackgroundTasks) try: update = await request.json() background_tasks.add_task(process_telegram_update, update) - except Exception as e: - logger.error(f"Error parsing telegram update: {e}") + except Exception: + logger.exception("Error parsing telegram update") return {"status": "ok"} diff --git a/app/api/v1/endpoints/woo_webhook.py b/app/api/v1/endpoints/woo_webhook.py index 1c9d90c..4747daa 100644 --- a/app/api/v1/endpoints/woo_webhook.py +++ b/app/api/v1/endpoints/woo_webhook.py @@ -1,94 +1,22 @@ import logging -from fastapi import APIRouter, BackgroundTasks, status -from pydantic import BaseModel +from fastapi import APIRouter, BackgroundTasks, Depends, status -from app.core.config import get_settings -from app.core.constants import ALERT_WOO_ORDER -from app.core.db import AsyncSessionLocal, commit_with_retry -from app.core.metrics import leads_created_total -from app.models.lead import Lead -from app.services.telegram_service import TelegramService +from app.api.dependencies import get_lead_service +from app.schemas.order import WooOrderPayload +from app.services.lead_service import LeadService logger = logging.getLogger(__name__) woo_webhook_router = APIRouter() -class WooOrderItem(BaseModel): - name: str - quantity: int - total: str - sku: str | None = None - - -class WooOrderPayload(BaseModel): - order_id: int | str - session_id: str - total: str - currency: str - first_name: str - last_name: str - phone: str - items: list[WooOrderItem] - - -async def process_woo_order_background(payload: WooOrderPayload) -> None: - """Process WooCommerce order in background.""" - settings = get_settings() - telegram_service = TelegramService(settings) - - # Create or update lead in the database - async with AsyncSessionLocal() as session: - try: - db_lead = Lead( - name=payload.first_name, - surname=payload.last_name, - phone_number=payload.phone, - contact_method="woo_checkout", - lead_type="checkout", - session_id=payload.session_id, - woo_order_id=str(payload.order_id), - status="success", # Immediate success because it's a sale - notification_status="pending", - ) - session.add(db_lead) - await commit_with_retry(session) - leads_created_total.labels(type="conversion", status="success").inc() - - # Form message - items_lines: list[str] = [] - for item in payload.items: - sku_info = f" (Арт: {item.sku})" if item.sku else "" - items_lines.append( - f"- {item.name}{sku_info} (x{item.quantity}) - {item.total} {payload.currency}" - ) - items_str = "\n".join(items_lines) - message = ALERT_WOO_ORDER.format( - order_id=payload.order_id, - first_name=payload.first_name, - last_name=payload.last_name, - phone=payload.phone, - total=payload.total, - currency=payload.currency, - items_str=items_str, - session_id=payload.session_id, - ) - - alert_success = await telegram_service.send_alert( - message, alert_type="conversion", session_id=payload.session_id - ) - - db_lead.notification_status = "sent" if alert_success else "failed" # type: ignore - await commit_with_retry(session) - except Exception as e: - logger.error(f"Error processing woo order webhook: {e}") - - @woo_webhook_router.post("/woo-order", status_code=status.HTTP_200_OK) async def woo_order_webhook( - payload: WooOrderPayload, background_tasks: BackgroundTasks + payload: WooOrderPayload, + background_tasks: BackgroundTasks, + lead_service: LeadService = Depends(get_lead_service), ) -> dict[str, str]: """Handle WooCommerce order webhook.""" - background_tasks.add_task(process_woo_order_background, payload) + background_tasks.add_task(lead_service.process_woo_order_background, payload) return {"status": "success", "message": "Webhook received"} diff --git a/app/core/config.py b/app/core/config.py index 24bf550..5a07fb0 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -8,6 +8,12 @@ class Settings(BaseSettings): openai_api_key: SecretStr openai_model: str = "gpt-4o-mini" embedding_model: str = "text-embedding-3-small" + model_pricing: dict[str, tuple[float, float]] = { + "gpt-4o": (5.0, 15.0), + "gpt-4o-mini": (0.150, 0.600), + "text-embedding-3-small": (0.02, 0.0), + "text-embedding-3-large": (0.13, 0.0), + } pinecone_api_key: SecretStr pinecone_environment: str diff --git a/app/core/constants.py b/app/core/constants.py index 6b438c5..385d3ee 100644 --- a/app/core/constants.py +++ b/app/core/constants.py @@ -1,4 +1,5 @@ # app/core/constants.py +import re # --- UI Messages / Responses --- MSG_GUARDRAIL_FAILED = "Вибачте, виникла технічна затримка при обробці запиту. Будь ласка, переформулюйте питання стосовно товарів." @@ -51,7 +52,6 @@ "⚠️ Помилка Скрапера!\n📦 Товар: {safe_product_name}\nНе вдалося отримати ціну." ) -REGEX_PRODUCT_NAME_HISTORY = r"['\"«»]([^'\"«»]+)['\"«»]" # --- Intent Names --- INTENT_PRODUCT = "product" @@ -64,8 +64,8 @@ INTENT_ORDER_STATUS = "order_status" # --- Regex Patterns --- -REGEX_PHONE = r"(?:\+380|380|0)\d{9}" -REGEX_CLEAN_QUERY = r"[\s\-\(\)]" +REGEX_PHONE = re.compile(r"(?:\+380|380|0)\d{9}") +REGEX_CLEAN_QUERY = re.compile(r"[\s\-\(\)]") # --- Common Settings --- MAX_PAYLOAD_SIZE = 2048 @@ -190,3 +190,25 @@ r"(?is)новые\s*инструкции", r"(?is)выведи.{0,20}(?:инструкции|промпт|правила)", ] + +# --- Intent Handler Instructions --- +INSTR_PRICE_CHANGED_ALERT = ( + "КРИТИЧНО: Нам потрібно уточнити актуальну ціну на цей товар. " + "КАТЕГОРИЧНО ЗАБОРОНЕНО казати клієнту, що ціна змінилася, або давати посилання на чекаут. " + "Прямо зараз попроси клієнта залишити номер телефону тут або написати нашому менеджеру в Telegram/Viber для узгодження фінальної ціни." +) +FACT_CHECKOUT_PRODUCT = ( + "Data: Product '{product_name}', актуальна та підтверджена ціна {woo_price} UAH. Conditions: {status_text}.\n" + "CRITICAL: Товар знайдено і ціна підтверджена. Запропонуй оформити замовлення. If 'In stock' - 1-3 days. 'Under order' - 14-20 days.\n" + "УВАГА: Бекенд вже згенерував UI-форму (кнопку) оформлення замовлення. КАТЕГОРИЧНО ЗАБОРОНЕНО питати у клієнта будь-які деталі замовлення (номер телефону, ПІБ, адресу, спосіб доставки чи оплати) безпосередньо в чаті! Просто скажи клієнту натиснути кнопку оформлення замовлення (або посилання), яка з'явилась." +) +FACT_INFO_PRODUCT = ( + "Data: Product '{product_name}', Ціна {woo_price} UAH. Conditions: {status_text}.\n" + "CRITICAL: If 'In stock' - 1-3 days. 'Under order' - 14-20 days. DO NOT PUT LINKS IN TEXT!" +) +INSTR_SEARCH_FALLBACK = "Бекенд знайшов товари за розширеним пошуком або категорією. Якщо клієнт питав загалом (наприклад 'монітори 4к'), просто презентуй їх. ТІЛЬКИ якщо клієнт шукав КОНКРЕТНУ модель, ввічливо скажи, що її немає і запропонуй ці альтернативи." +INSTR_ORDER_ID_MISSING = "Не вдалося визначити номер замовлення. Попроси клієнта вказати точний номер замовлення (тільки цифри)." +INSTR_ORDER_NOT_FOUND = "Замовлення з номером {order_id} не знайдено. Перепроси та спитай, чи можливо клієнт помилився цифрою або номером." +INSTR_ORDER_STATUS = "Клієнт запитує про своє замовлення. Використовуй надані дані (статус, суму, товари, доставку), щоб ввічливо відповісти йому. Не вигадуй інформацію, якої немає." +INSTR_NO_PHONE_IN_CHAT = "КРИТИЧНО: Бекенд вже згенерував UI-форму/кнопку для клієнта. В тексті повідомлення категорично ЗАБОРОНЕНО просити клієнта писати номер телефону, ПІБ, способи доставки чи оплати в чат. Всі дані клієнт має ввести у форму." +INSTR_NO_PREPAYMENT = "УВАГА: Вартість товару менше 40 000 грн. ЗАБОРОНЕНО згадувати про будь-яку 'передоплату', 'аванс' або умови для сум понад 40 тис. Це заплутує клієнта." diff --git a/app/core/db.py b/app/core/db.py index 6b3af70..889eb68 100644 --- a/app/core/db.py +++ b/app/core/db.py @@ -26,7 +26,7 @@ def set_sqlite_pragma(dbapi_connection: sqlite3.Connection, connection_record: A @retry( stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=0.1, min=0.1, max=2.0), + wait=wait_exponential(multiplier=0.1, min=0.1, max=0.5), retry=retry_if_exception_type(OperationalError), ) async def commit_with_retry(session: Any) -> None: @@ -47,11 +47,12 @@ async def init_db(): """Initializes the database and creates all tables.""" # Import models here so that Base.metadata.create_all can see them, # while avoiding circular imports and E402 from Ruff. - from app.models.chat_memory import ChatMessage + from app.models.chat_memory import ChatMessage, SessionState from app.models.lead import Lead # Stub so that Pylance/Ruff don't complain about "unused import" _ = ChatMessage + _ = SessionState _ = Lead async with engine.begin() as conn: diff --git a/app/core/logging_config.py b/app/core/logging_config.py index 86857f6..6130603 100644 --- a/app/core/logging_config.py +++ b/app/core/logging_config.py @@ -1,5 +1,8 @@ +import atexit import logging +import queue import sys +from logging.handlers import QueueHandler, QueueListener from pathlib import Path from typing import Any @@ -56,16 +59,34 @@ def setup_logging(log_level: str) -> None: file_handler = logging.FileHandler(log_file_path, encoding="utf-8") file_handler.setFormatter(formatter) + class StructlogQueueHandler(QueueHandler): + def prepare(self, record: logging.LogRecord) -> logging.LogRecord: + return record + + # Non-blocking async queue + log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1) + queue_handler = StructlogQueueHandler(log_queue) + + listener = QueueListener(log_queue, stream_handler, file_handler, respect_handler_level=True) + listener.start() + atexit.register(listener.stop) + root_logger = logging.getLogger() root_logger.handlers.clear() - root_logger.addHandler(stream_handler) - root_logger.addHandler(file_handler) + root_logger.addHandler(queue_handler) root_logger.setLevel(numeric_level) + class UvicornEndpointFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + msg = record.getMessage() + return "GET /metrics " not in msg and "GET /metrics/ " not in msg + for logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access"): uvicorn_logger = logging.getLogger(logger_name) uvicorn_logger.handlers.clear() uvicorn_logger.propagate = True + if logger_name == "uvicorn.access": + uvicorn_logger.addFilter(UvicornEndpointFilter()) def get_logger(name: str) -> structlog.stdlib.BoundLogger: diff --git a/app/main.py b/app/main.py index cfc47c2..809cad5 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,5 @@ import contextlib +import os from collections.abc import AsyncGenerator from pathlib import Path @@ -34,9 +35,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: setup_logging(settings.log_level) # --- ADDED FOR CACHE FIX --- - from app.services.cache_service import CacheService + from app.api.dependencies import get_cache_service - cache_service = CacheService(settings) + cache_service = get_cache_service() await cache_service.initialize() if settings.sentry_dsn: @@ -49,9 +50,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: logger.info("Application started", env=settings.pinecone_environment) - from app.services.vector_service import VectorService + from app.api.dependencies import get_vector_service - vector_service = VectorService(settings) + vector_service = get_vector_service() await vector_service.initialize() # TECHNICAL DEBT: @@ -66,7 +67,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: from app.services.statistics_service import gather_and_send_daily_report_job - jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///bubblebrain.db")} + jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///digitaldreams.db")} scheduler = AsyncIOScheduler(jobstores=jobstores) # type: ignore scheduler.add_job( # type: ignore[reportUnknownMemberType] export_categories_to_csv, @@ -85,8 +86,11 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: replace_existing=True, ) - scheduler.start() # type: ignore - logger.info("APScheduler started: jobs scheduled") + if os.getenv("RUN_CRON", "false").lower() == "true": + scheduler.start() # type: ignore + logger.info("APScheduler started: jobs scheduled") + else: + logger.info("APScheduler not started: RUN_CRON is not 'true'") yield @@ -95,16 +99,15 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None]: scheduler.shutdown(wait=False) # type: ignore # Close HTTP clients - from app.api.v1.endpoints.chat import get_cached_rag_engine + from app.api.dependencies import get_scraper_service - rag_engine = get_cached_rag_engine() - # Pylance does not know that RAGEngine has price_comparator with scraper_service due to lack of type hints - # but at runtime this is correct. We can just use # type: ignore to satisfy Pylance - await rag_engine.price_comparator.scraper_service.close() # type: ignore + scraper_service = get_scraper_service() + await scraper_service.close() - from app.services.woo_service import close_woo_client + from app.api.dependencies import get_woo_service - await close_woo_client() + woo_service = get_woo_service() + await woo_service.close() if settings.sentry_dsn: sentry_sdk.flush() diff --git a/app/middleware/rate_limiter.py b/app/middleware/rate_limiter.py index 11f17ef..33ba928 100644 --- a/app/middleware/rate_limiter.py +++ b/app/middleware/rate_limiter.py @@ -2,4 +2,6 @@ from app.utils.network import get_client_ip +# TODO(TechDebt): Configure Limiter to use Redis as a storage backend for production +# to ensure rate limits are synced across multiple workers/instances. limiter = Limiter(key_func=get_client_ip) diff --git a/app/middleware/request_logging.py b/app/middleware/request_logging.py index 9d53572..e3ae3b7 100644 --- a/app/middleware/request_logging.py +++ b/app/middleware/request_logging.py @@ -8,6 +8,8 @@ logger = get_logger(__name__) +EXCLUDED_PATHS = {"/metrics", "/metrics/", "/health"} + class RequestLoggingMiddleware: def __init__(self, app: ASGIApp) -> None: @@ -17,7 +19,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": return await self.app(scope, receive, send) - request_id = str(uuid.uuid4()) + request_id = uuid.uuid4().hex start_time = time.perf_counter() bound_logger = logger.bind(request_id=request_id) @@ -41,10 +43,11 @@ async def send_wrapper(message: Message) -> None: raise exc finally: process_time = time.perf_counter() - start_time - bound_logger.info( - "Request completed", - method=method, - path=path, - status_code=status_code, - latency=round(process_time, 4), - ) + if path not in EXCLUDED_PATHS: + bound_logger.info( + "Request completed", + method=method, + path=path, + status_code=status_code, + latency=round(process_time, 4), + ) diff --git a/app/models/chat_memory.py b/app/models/chat_memory.py index e36a60a..72bf159 100644 --- a/app/models/chat_memory.py +++ b/app/models/chat_memory.py @@ -1,6 +1,5 @@ -import datetime - from sqlalchemy import Column, DateTime, Integer, String, Text +from sqlalchemy.sql import func from app.core.db import Base @@ -12,4 +11,17 @@ class ChatMessage(Base): session_id = Column(String(100), index=True, nullable=False) role = Column(String(20), nullable=False) # 'user' or 'bot' content = Column(Text, nullable=False) - created_at = Column(DateTime, default=lambda: datetime.datetime.now(datetime.UTC), index=True) + created_at = Column(DateTime, server_default=func.now(), index=True) + + +class SessionState(Base): + __tablename__ = "session_states" + + session_id = Column(String(100), primary_key=True, index=True) + last_search_query = Column(String(255), nullable=True) + last_products = Column(Text, nullable=True) # JSON list + updated_at = Column( + DateTime, + server_default=func.now(), + onupdate=func.now(), + ) diff --git a/app/models/lead.py b/app/models/lead.py index 58601d2..91ef3d0 100644 --- a/app/models/lead.py +++ b/app/models/lead.py @@ -1,10 +1,35 @@ -import datetime +import enum -from sqlalchemy import Column, DateTime, Integer, String +from sqlalchemy import Column, DateTime, Enum, Integer, String +from sqlalchemy.sql import func from app.core.db import Base +class LeadType(enum.Enum): + contact = "contact" + checkout = "checkout" + + +class ContactMethod(enum.Enum): + telegram = "telegram" + viber = "viber" + phone = "phone" + + +class NotificationStatus(enum.Enum): + pending = "pending" + sent = "sent" + failed = "failed" + + +class LeadStatus(enum.Enum): + new = "new" + success = "success" + decline = "decline" + in_progress = "in_progress" + + class Lead(Base): __tablename__ = "leads" @@ -12,11 +37,11 @@ class Lead(Base): name = Column(String(50), nullable=False) surname = Column(String(50), nullable=True) phone_number = Column(String(50), nullable=False) - contact_method = Column(String(20), nullable=False) - lead_type = Column(String(20), default="contact") # contact, checkout + contact_method = Column(Enum(ContactMethod), nullable=False) + lead_type = Column(Enum(LeadType), default=LeadType.contact) delivery_address = Column(String(255), nullable=True) - notification_status = Column(String(20), default="pending") # pending, sent, failed - status = Column(String(20), default="new") # new, success, decline, in_progress + notification_status = Column(Enum(NotificationStatus), default=NotificationStatus.pending) + status = Column(Enum(LeadStatus), default=LeadStatus.new) session_id = Column(String(100), nullable=True, index=True) woo_order_id = Column(String(50), nullable=True) - created_at = Column(DateTime, default=lambda: datetime.datetime.now(datetime.UTC)) + created_at = Column(DateTime, server_default=func.now()) diff --git a/app/schemas/chat.py b/app/schemas/chat.py index e8197fa..d88ffa3 100644 --- a/app/schemas/chat.py +++ b/app/schemas/chat.py @@ -97,6 +97,7 @@ class IntentContextResult: requires_lead: bool lead_form_type: Literal["contact", "checkout"] | None = None new_intent_type: str | None = None + viewed_products: list[str] | None = None class IntentDetectionResult(BaseModel): diff --git a/app/schemas/lead.py b/app/schemas/lead.py index 8a1d613..5bc258e 100644 --- a/app/schemas/lead.py +++ b/app/schemas/lead.py @@ -7,9 +7,9 @@ class ContactFormLead(BaseModel): - name: str = Field(..., max_length=50, pattern=r"^[A-Za-zА-Яа-яЄєІіЇїҐґ\s\-]+$") + name: str = Field(..., max_length=50, pattern=r"^[A-Za-zА-Яа-яЄєІіЇїҐґ\s\-']+$") surname: str | None = Field( - default=None, max_length=50, pattern=r"^[A-Za-zА-Яа-яЄєІіЇїҐґ\s\-]+$" + default=None, max_length=50, pattern=r"^[A-Za-zА-Яа-яЄєІіЇїҐґ\s\-']+$" ) phone_number: str = Field(..., max_length=50, description="Phone") contact_method: Literal["telegram", "viber", "phone"] diff --git a/app/schemas/order.py b/app/schemas/order.py index ba0cc5a..e128da0 100644 --- a/app/schemas/order.py +++ b/app/schemas/order.py @@ -32,3 +32,21 @@ class WooOrder(BaseModel): payment_method_title: str = "" shipping_lines: list[WooOrderShipping] = [] line_items: list[WooOrderLineItem] = [] + + +class WooOrderItem(BaseModel): + name: str + quantity: int + total: str + sku: str | None = None + + +class WooOrderPayload(BaseModel): + order_id: int | str + session_id: str + total: str + currency: str + first_name: str + last_name: str + phone: str + items: list[WooOrderItem] diff --git a/app/services/category_manager.py b/app/services/category_manager.py index 2e9bc20..2eb9111 100644 --- a/app/services/category_manager.py +++ b/app/services/category_manager.py @@ -1,6 +1,7 @@ import asyncio import csv -import os + +from anyio import Path as AnyioPath from app.core.logging_config import get_logger @@ -21,56 +22,70 @@ async def initialize(self) -> None: async def _load_categories(self) -> None: """Checks the file modification date and safely updates the cache.""" - async with self._lock: - # os.path.exists and os.path.getmtime are fast enough for the main thread - if not os.path.exists(self.csv_path): # noqa: ASYNC240 - logger.warning("Category CSV not found", path=self.csv_path) - return - + csv_path_obj = AnyioPath(self.csv_path) + if not await csv_path_obj.exists(): + logger.warning( + "Category CSV not found, attempting to fetch from WooCommerce...", + path=self.csv_path, + ) try: - mtime = os.path.getmtime(self.csv_path) # noqa: ASYNC240 - if mtime <= self._last_mtime: - return # File hasn't changed - - def _read_and_parse_csv() -> tuple[dict[str, int], list[str], int]: - new_map: dict[str, int] = {} - new_list: list[str] = [] - skipped_rows = 0 - - with open(self.csv_path, encoding="utf-8") as f: - reader = csv.DictReader(f) - for row in reader: - try: - count = int(row.get("Count", 0)) - if count > 0: - cat_id = int(row["ID"]) - name = row["Name"].strip() - new_map[name.lower()] = cat_id - new_list.append(name) - except (ValueError, KeyError, TypeError): - skipped_rows += 1 - continue - return new_map, new_list, skipped_rows - - new_map, new_list, skipped_rows = await asyncio.to_thread(_read_and_parse_csv) - - # Update state only if parsing was successful and there is data - if new_map: - self._categories_map = new_map - self._categories_list_str = ", ".join(new_list) - self._last_mtime = mtime - logger.info( - "Hot-reloaded categories", - count=len(self._categories_map), - skipped_rows=skipped_rows, - path=self.csv_path, - ) + from scripts.export_categories import export_categories_to_csv + await export_categories_to_csv() except Exception as e: + logger.error("Failed to automatically export categories", error=str(e)) + + if not await csv_path_obj.exists(): logger.error( - "Failed to load categories, keeping old cache", path=self.csv_path, error=str(e) + "Category CSV still not found after export attempt.", path=self.csv_path + ) + return + + try: + stat = await csv_path_obj.stat() + mtime = stat.st_mtime + if mtime <= self._last_mtime: + return # File hasn't changed + + def _read_and_parse_csv() -> tuple[dict[str, int], list[str], int]: + new_map: dict[str, int] = {} + new_list: list[str] = [] + skipped_rows = 0 + + with open(self.csv_path, encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + try: + count = int(row.get("Count", 0)) + if count > 0: + cat_id = int(row["ID"]) + name = row["Name"].strip() + new_map[name.lower()] = cat_id + new_list.append(name) + except (ValueError, KeyError, TypeError): + skipped_rows += 1 + continue + return new_map, new_list, skipped_rows + + new_map, new_list, skipped_rows = await asyncio.to_thread(_read_and_parse_csv) + + if new_map: + async with self._lock: + if mtime > self._last_mtime: + self._categories_map = new_map + self._categories_list_str = ", ".join(new_list) + self._last_mtime = mtime + logger.info( + "Hot-reloaded categories", + count=len(self._categories_map), + skipped_rows=skipped_rows, + path=self.csv_path, ) - # Do not clear the old cache, just log the error (prevent Race Condition) + + except Exception as e: + logger.error( + "Failed to load categories, keeping old cache", path=self.csv_path, error=str(e) + ) async def get_categories_string(self) -> str: """Returns a string of categories for injection into the LLM prompt.""" diff --git a/app/services/chat_memory_service.py b/app/services/chat_memory_service.py index 3e69947..c07fae5 100644 --- a/app/services/chat_memory_service.py +++ b/app/services/chat_memory_service.py @@ -1,3 +1,4 @@ +import json import logging from collections.abc import Callable from typing import Any @@ -5,7 +6,7 @@ from sqlalchemy import delete, select from app.core.db import AsyncSessionLocal, commit_with_retry -from app.models.chat_memory import ChatMessage +from app.models.chat_memory import ChatMessage, SessionState logger = logging.getLogger(__name__) @@ -14,6 +15,46 @@ class ChatMemoryService: def __init__(self, session_factory: Callable[..., Any] = AsyncSessionLocal) -> None: self.session_factory = session_factory + async def get_session_state(self, session_id: str) -> dict[str, Any]: + """Retrieves the current session state.""" + async with self.session_factory() as session: + stmt = select(SessionState).where(SessionState.session_id == session_id) + result = await session.execute(stmt) + state = result.scalar_one_or_none() + if state: + return { + "last_search_query": state.last_search_query, + "last_products": json.loads(state.last_products) if state.last_products else [], + } + return {"last_search_query": None, "last_products": []} + + async def update_session_state( + self, + session_id: str, + last_search_query: str | None = None, + last_products: list[str] | None = None, + ) -> None: + """Updates the current session state.""" + async with self.session_factory() as session: + try: + stmt = select(SessionState).where(SessionState.session_id == session_id) + result = await session.execute(stmt) + state = result.scalar_one_or_none() + + if not state: + state = SessionState(session_id=session_id) + session.add(state) + + if last_search_query is not None: + state.last_search_query = last_search_query # type: ignore + if last_products is not None: + state.last_products = json.dumps(last_products, ensure_ascii=False) # type: ignore + + await commit_with_retry(session) + except Exception as e: + logger.exception("Failed to update session state", extra={"error": str(e)}) + await session.rollback() + async def get_history( self, session_id: str, limit: int = 6, ignore_reset: bool = False ) -> list[dict[str, str]]: diff --git a/app/services/document_processor.py b/app/services/document_processor.py index 6b08e35..aa5f276 100644 --- a/app/services/document_processor.py +++ b/app/services/document_processor.py @@ -15,7 +15,8 @@ def _parse_pdf(content: bytes) -> str: with pdfplumber.open(io.BytesIO(content)) as pdf: pages_text: list[str] = [] for page in pdf.pages: - page_text = str(page.extract_text()) if page.extract_text() else "" + extracted = page.extract_text() + page_text = str(extracted) if extracted else "" if page_text: pages_text.append(page_text) return "\n\n".join(pages_text) @@ -27,13 +28,31 @@ def _parse_docx(content: bytes) -> str: return "\n\n".join(paragraphs) +class FileSizeExceededError(ValueError): + pass + + +MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB + + async def extract_text(file: UploadFile) -> str: """ Extract text from a file depending on its MIME type (PDF, TXT, DOCX, MD). - Loads the file into memory to avoid blocking the Event Loop. + Reads the file in chunks to prevent OOM. """ content_type = file.content_type - content = await file.read() + + content_array = bytearray() + chunk_size = 1024 * 1024 + while True: + chunk = await file.read(chunk_size) + if not chunk: + break + content_array.extend(chunk) + if len(content_array) > MAX_FILE_SIZE: + raise FileSizeExceededError("File size exceeds 5 MB limit") + + content = bytes(content_array) if not content: raise ValueError("Empty document") diff --git a/app/services/intent_detection_service.py b/app/services/intent_detection_service.py new file mode 100644 index 0000000..ac92eb0 --- /dev/null +++ b/app/services/intent_detection_service.py @@ -0,0 +1,131 @@ +import json +import logging +import re +from typing import Any + +from app.core.constants import ( + INTENT_CHECKOUT, + INTENT_CONTACT, + INTENT_FAQ, + INTENT_GENERAL, + INTENT_HYBRID, + INTENT_ORDER_STATUS, + INTENT_PRODUCT, + INTENT_SEARCH, +) +from app.schemas.chat import IntentDetectionResult +from app.services.category_manager import CategoryManager +from app.services.openai_service import OpenAIService +from app.utils.prompts import INTENT_ANALYZER_PROMPT + +logger = logging.getLogger(__name__) + + +class IntentDetectionService: + """Service responsible for parsing user queries and detecting intent.""" + + def __init__(self, openai_service: OpenAIService, category_manager: CategoryManager) -> None: + self.openai_service = openai_service + self.category_manager = category_manager + + async def detect_intent( + self, question: str, history_context: str, session_state_json: str + ) -> dict[str, Any]: + """Detects user intent using regex and LLM.""" + stripped_q = question.strip() + + # Regex checks for product codes + sku_match = re.search(r"\b\d{6}\b", stripped_q) + pn_match = re.search( + r"\b(?=[a-zA-Z0-9\-\.]*[a-zA-Z])(?=[a-zA-Z0-9\-\.]*\d)[a-zA-Z0-9\-\.]{5,25}\b", + stripped_q, + ) + + extracted_code = None + if sku_match: + extracted_code = sku_match.group(0) + elif pn_match: + extracted_code = pn_match.group(0) + + faq_triggers = [ + "доставка", + "оплата", + "гаранті", + "купити", + "замовити", + "наявн", + "скільки", + "як ", + ] + needs_llm = any(trigger in stripped_q.lower() for trigger in faq_triggers) + + # Fast-path for pure part numbers/SKUs + if extracted_code and not needs_llm and len(stripped_q) < 100: + return { + "intent": INTENT_SEARCH, + "product_name": None, + "strict_query": extracted_code, + "broad_query": stripped_q, + "category_query": None, + "normalized_faq_queries": [], + } + + # Fast-path for pure checkout intents + checkout_triggers = [ + "хочу замовити", + "замовити", + "оформити замовлення", + "беру", + "купую", + "оформити", + "хочу купити", + ] + if stripped_q.lower() in checkout_triggers: + return { + "intent": INTENT_CHECKOUT, + "product_name": None, # Will be filled by rag_engine session_state fallback + "strict_query": None, + "broad_query": None, + "category_query": None, + "normalized_faq_queries": [], + } + + categories_str = await self.category_manager.get_categories_string() + + prompt = INTENT_ANALYZER_PROMPT.format( + history_context=history_context if history_context else "None", + session_state_json=session_state_json, + question=question, + categories_str=categories_str, + intent_product=INTENT_PRODUCT, + intent_search=INTENT_SEARCH, + intent_checkout=INTENT_CHECKOUT, + intent_hybrid=INTENT_HYBRID, + intent_general=INTENT_GENERAL, + intent_contact=INTENT_CONTACT, + intent_order_status=INTENT_ORDER_STATUS, + ) + + try: + response = await self.openai_service.get_chat_completion( + system_prompt="You are a system JSON analyzer. Ensure you respond with valid JSON.", + user_message=prompt, + context_chunks=[], + response_format={"type": "json_object"}, + ) + cleaned = response.replace("```json", "").replace("```", "").strip() + parsed_json = IntentDetectionResult.model_validate_json(cleaned).model_dump() + + # Fallback override + if extracted_code and parsed_json.get("intent") in (INTENT_GENERAL, INTENT_FAQ): + parsed_json["intent"] = INTENT_SEARCH + parsed_json["strict_query"] = extracted_code + parsed_json["broad_query"] = stripped_q + + return parsed_json + except json.JSONDecodeError: + logger.exception("Intent detection JSON parsing failed") + return {"intent": INTENT_FAQ, "normalized_faq_queries": [question]} + except Exception: + logger.exception("Intent detection generic error") + return {"intent": INTENT_FAQ, "normalized_faq_queries": [question]} diff --git a/app/services/intent_handlers.py b/app/services/intent_handlers.py index ad89ad1..83feb37 100644 --- a/app/services/intent_handlers.py +++ b/app/services/intent_handlers.py @@ -1,4 +1,3 @@ -import html import logging from typing import Any @@ -6,10 +5,16 @@ from app.core.config import Settings from app.core.constants import ( - ALERT_MARGIN_ISSUE, - ALERT_SCRAPER_FAILED, BTN_CHANGE_PRICE, BTN_VIEW_PRODUCT, + FACT_CHECKOUT_PRODUCT, + FACT_INFO_PRODUCT, + INSTR_NO_PREPAYMENT, + INSTR_ORDER_ID_MISSING, + INSTR_ORDER_NOT_FOUND, + INSTR_ORDER_STATUS, + INSTR_PRICE_CHANGED_ALERT, + INSTR_SEARCH_FALLBACK, INTENT_CHECKOUT, INTENT_HYBRID, INTENT_PRODUCT, @@ -22,13 +27,12 @@ STATUS_OUT_OF_STOCK, ) from app.schemas.chat import IntentContextResult +from app.services.notification_builder import NotificationBuilder from app.services.price_comparator import PriceComparator from app.services.telegram_service import TelegramService from app.utils.prompts import ( INSTR_ALERT_FAILED, - INSTR_BROAD_SEARCH_FALLBACK, INSTR_CHECKOUT_TELEGRAM, - INSTR_FALLBACK_CATEGORY, INSTR_NO_DUPLICATE_LINKS, INSTR_NOTHING_FOUND, INSTR_PRICE_CHECKING, @@ -79,7 +83,7 @@ async def handle( result = await self.price_comparator.compare( product_name, is_checkout=is_checkout, category_id=category_id ) - except httpx.RequestError as e: + except Exception as e: logger.exception( "Price comparator failed during intent handling", extra={"error": str(e)} ) @@ -108,15 +112,10 @@ async def handle( if result.needs_alert: requires_lead = True lead_form_type = "contact" - safe_product_name = ( - html.escape(str(result.product_name)) - if result.product_name - else "Unknown Product" - ) if result.alert_reason in ["low_margin", "checkout_margin_issue"]: - msg = ALERT_MARGIN_ISSUE.format( - safe_product_name=safe_product_name, + msg = NotificationBuilder.build_margin_alert( + product_name=result.product_name, woo_price=result.woo_price, supplier_price=result.supplier_price_uah, diff_woo=result.diff_woo_uah, @@ -146,14 +145,12 @@ async def handle( if not alert_success: system_instructions.append(INSTR_ALERT_FAILED) else: - system_instructions.append( - "КРИТИЧНО: Ціна на товар змінилася і потребує уточнення. " - "КАТЕГОРИЧНО ЗАБОРОНЕНО пропонувати оформлення замовлення або давати посилання на чекаут. " - "Прямо зараз попроси клієнта залишити номер телефону тут або написати нашому менеджеру в Telegram/Viber для узгодження фінальної ціни." - ) + system_instructions.append(INSTR_PRICE_CHANGED_ALERT) product_facts.extend(attributes_facts) else: - msg = ALERT_SCRAPER_FAILED.format(safe_product_name=safe_product_name) + msg = NotificationBuilder.build_scraper_failed_alert( + product_name=result.product_name + ) alert_success = await self.telegram_service.send_alert( msg, alert_type="error", session_id=session_id ) @@ -186,9 +183,20 @@ async def handle( if result.availability_status == "instock" else STATUS_OUT_OF_STOCK ) + + if ( + result.availability_status == "instock" + and result.woo_price is not None + and result.woo_price < 40000 + ): + system_instructions.append(INSTR_NO_PREPAYMENT) + product_facts.append( - f"Data: Product '{result.product_name}', актуальна та підтверджена ціна {result.woo_price} UAH. Conditions: {status_text}.\n" - f"CRITICAL: Якщо товар підтверджено, запропонуй оформити замовлення. If 'In stock' - 1-3 days. 'Under order' - 14-20 days. DO NOT PUT LINKS IN TEXT!" + FACT_CHECKOUT_PRODUCT.format( + product_name=result.product_name, + woo_price=result.woo_price, + status_text=status_text, + ) ) product_facts.extend(attributes_facts) @@ -219,10 +227,17 @@ async def handle( if result.availability_status == "instock" else STATUS_OUT_OF_STOCK ) + + if result.availability_status == "instock" and result.woo_price < 40000: + system_instructions.append(INSTR_NO_PREPAYMENT) + # For a normal request, the price is always "Estimated" product_facts.append( - f"Data: Product '{result.product_name}', Ціна {result.woo_price} UAH. Conditions: {status_text}.\n" - f"CRITICAL: If 'In stock' - 1-3 days. 'Under order' - 14-20 days. DO NOT PUT LINKS IN TEXT!" + FACT_INFO_PRODUCT.format( + product_name=result.product_name, + woo_price=result.woo_price, + status_text=status_text, + ) ) product_facts.extend(attributes_facts) else: @@ -232,6 +247,10 @@ async def handle( ) new_intent_type = INTENT_SEARCH + viewed_products = ( + [result.product_name] if "result" in locals() and result and result.product_name else [] + ) + return IntentContextResult( product_facts=product_facts, system_instructions=system_instructions, @@ -239,6 +258,7 @@ async def handle( requires_lead=requires_lead, lead_form_type=lead_form_type, new_intent_type=new_intent_type, + viewed_products=viewed_products, ) @@ -347,7 +367,7 @@ async def handle( ) if product.attributes: - top_attrs = list(product.attributes.items())[:5] + top_attrs = list(product.attributes.items())[:15] attr_str = "\n".join(f" * {k}: {v}" for k, v in top_attrs) search_facts.append(f" Characteristics:\n{attr_str}") @@ -358,14 +378,8 @@ async def handle( search_facts.append(INSTR_NO_DUPLICATE_LINKS) product_facts.append("\n".join(search_facts)) - if is_fallback_category: - system_instructions.append(INSTR_FALLBACK_CATEGORY) - elif is_fallback_broad: - system_instructions.append( - INSTR_BROAD_SEARCH_FALLBACK.format( - broad_term=broad_term, strict_term=strict_term - ) - ) + if is_fallback_category or is_fallback_broad: + system_instructions.append(INSTR_SEARCH_FALLBACK) else: system_instructions.append( INSTR_PRODUCT_FOUND.format(product_name=woo_products[0].name) @@ -374,12 +388,15 @@ async def handle( requires_lead = True system_instructions.append(INSTR_NOTHING_FOUND) + viewed_products = [p.name for p in woo_products] if woo_products else [] + return IntentContextResult( product_facts=product_facts, system_instructions=system_instructions, extracted_links=extracted_links, requires_lead=requires_lead, lead_form_type=lead_form_type, + viewed_products=viewed_products, ) @@ -405,9 +422,7 @@ async def handle( order_id_str = match.group(0) if not order_id_str or not order_id_str.isdigit(): - system_instructions.append( - "Не вдалося визначити номер замовлення. Попроси клієнта вказати точний номер замовлення (тільки цифри)." - ) + system_instructions.append(INSTR_ORDER_ID_MISSING) return IntentContextResult( product_facts=product_facts, system_instructions=system_instructions, @@ -425,10 +440,39 @@ async def handle( order_data = None if not order_data: - system_instructions.append( - f"Замовлення з номером {order_id} не знайдено. Перепроси та спитай, чи можливо клієнт помилився цифрою або номером." - ) + system_instructions.append(INSTR_ORDER_NOT_FOUND.format(order_id=order_id)) else: + extracted_phone = intent_data.get("phone", "") + extracted_phone_clean = re.sub(r"\D", "", str(extracted_phone)) + billing_phone = order_data.get("billing", {}).get("phone", "") + billing_phone_clean = re.sub(r"\D", "", str(billing_phone)) + + if not extracted_phone_clean: + system_instructions.append( + f"Скажіть користувачу: 'Для перевірки статусу замовлення #{order_id}, з метою безпеки, вкажіть номер телефону, на який було оформлено замовлення.'" + ) + return IntentContextResult( + product_facts=product_facts, + system_instructions=system_instructions, + extracted_links=[], + requires_lead=False, + lead_form_type=None, + new_intent_type=None, + ) + + if not billing_phone_clean or extracted_phone_clean[-9:] != billing_phone_clean[-9:]: + system_instructions.append( + f"Скажіть користувачу: 'Вказаний номер телефону не збігається з номером у замовленні #{order_id}. Доступ заборонено.'" + ) + return IntentContextResult( + product_facts=product_facts, + system_instructions=system_instructions, + extracted_links=[], + requires_lead=False, + lead_form_type=None, + new_intent_type=None, + ) + status_map = { "pending": "Очікує оплати", "processing": "В обробці", @@ -456,9 +500,7 @@ async def handle( fact += f" Товари: {items_str}." product_facts.append(fact) - system_instructions.append( - "Клієнт запитує про своє замовлення. Використовуй надані дані (статус, суму, товари, доставку), щоб ввічливо відповісти йому. Не вигадуй інформацію, якої немає." - ) + system_instructions.append(INSTR_ORDER_STATUS) return IntentContextResult( product_facts=product_facts, diff --git a/app/services/lead_service.py b/app/services/lead_service.py new file mode 100644 index 0000000..be6bae4 --- /dev/null +++ b/app/services/lead_service.py @@ -0,0 +1,226 @@ +import re +from datetime import datetime +from urllib.parse import urlparse, urlunparse +from zoneinfo import ZoneInfo + +import sentry_sdk +from tenacity import RetryError + +from app.core.constants import ( + ALERT_BOT_LEAD, + ALERT_HOT_LEAD, + ALERT_WOO_ORDER, + BTN_DECLINE, + BTN_IN_PROGRESS, + BTN_PRODUCT_LINK, + BTN_SUCCESS, +) +from app.core.db import AsyncSessionLocal, commit_with_retry +from app.core.logging_config import get_logger +from app.core.metrics import leads_created_total +from app.models.lead import Lead +from app.schemas.lead import ContactFormLead +from app.schemas.order import WooOrderPayload +from app.services.chat_memory_service import ChatMemoryService +from app.services.telegram_service import TelegramService + +logger = get_logger(__name__) + + +class LeadService: + def __init__( + self, telegram_service: TelegramService, chat_memory_service: ChatMemoryService + ) -> None: + self.telegram_service = telegram_service + self.chat_memory_service = chat_memory_service + + async def create_contact_lead( + self, lead_data: ContactFormLead, client_ip: str + ) -> tuple[int, str, str]: + """Creates a lead in DB and returns lead_id, message, alert_type.""" + async with AsyncSessionLocal() as session: + db_lead = Lead( + name=lead_data.name, + surname=lead_data.surname, + phone_number=lead_data.phone_number, + contact_method=lead_data.contact_method, + lead_type=lead_data.lead_type, + delivery_address=lead_data.delivery_address, + notification_status="pending", + session_id=lead_data.session_id, + ) + session.add(db_lead) + await commit_with_retry(session) + await session.refresh(db_lead) + lead_id = int(db_lead.id) # type: ignore + + tz = ZoneInfo("Europe/Kyiv") + now_str = datetime.now(tz).strftime("%d.%m.%Y %H:%M") + + if lead_data.lead_type == "checkout": + message = ALERT_HOT_LEAD.format( + lead_id=lead_id, + now_str=now_str, + name=lead_data.name, + surname=lead_data.surname or "", + phone=lead_data.phone_number, + method=lead_data.contact_method, + address=lead_data.delivery_address or "Не вказана", + ip=client_ip, + ) + leads_created_total.labels(type="checkout", status="success").inc() + return lead_id, message, "hot_lead" + else: + message = ALERT_BOT_LEAD.format( + lead_id=lead_id, + now_str=now_str, + name=lead_data.name, + phone=lead_data.phone_number, + method=lead_data.contact_method, + ip=client_ip, + ) + leads_created_total.labels(type="contact", status="success").inc() + return lead_id, message, "lead" + + async def process_lead_background( + self, lead_id: int, message: str, alert_type: str = "lead", session_id: str | None = None + ) -> None: + """Process lead in background and send Telegram notification.""" + reply_markup = { + "inline_keyboard": [ + [ + {"text": BTN_SUCCESS, "callback_data": f"lead_status:{lead_id}:success"}, + {"text": BTN_DECLINE, "callback_data": f"lead_status:{lead_id}:decline"}, + ], + [{"text": BTN_IN_PROGRESS, "callback_data": f"lead_status:{lead_id}:in_progress"}], + ] + } + async with AsyncSessionLocal() as session: + try: + if session_id: + history = await self.chat_memory_service.get_history(session_id, limit=10) + if history: + bot_msgs = [m["content"] for m in history if m["role"] == "bot"] + product_url = None + if bot_msgs: + for msg in reversed(bot_msgs): + match = re.search(r"", msg) + if match: + raw_url = match.group(1) + parsed = urlparse(raw_url) + product_url = urlunparse( + ( + parsed.scheme, + parsed.netloc, + parsed.path, + parsed.params, + "", + parsed.fragment, + ) + ) + break + + if product_url: + reply_markup["inline_keyboard"].insert( + 0, [{"text": BTN_PRODUCT_LINK, "url": product_url}] + ) + + await self.telegram_service.send_alert( + message, alert_type=alert_type, reply_markup=reply_markup, session_id=session_id + ) + + # Update status to sent + lead = await session.get(Lead, lead_id) + if lead: + lead.notification_status = "sent" # type: ignore + await commit_with_retry(session) + except RetryError as e: + with sentry_sdk.push_scope() as scope: + scope.set_tag("lead_id", str(lead_id)) + sentry_sdk.capture_exception(e) + lead = await session.get(Lead, lead_id) + if lead: + lead.notification_status = "failed" # type: ignore + await commit_with_retry(session) + except Exception as e: + with sentry_sdk.isolation_scope() as scope: + scope.set_tag("lead_id", str(lead_id)) + sentry_sdk.capture_exception(e) + lead = await session.get(Lead, lead_id) + if lead: + lead.notification_status = "failed" # type: ignore + await commit_with_retry(session) + + async def process_woo_order_background(self, payload: WooOrderPayload) -> None: + """Process WooCommerce order in background.""" + # Create or update lead in the database + async with AsyncSessionLocal() as session: + try: + db_lead = Lead( + name=payload.first_name, + surname=payload.last_name, + phone_number=payload.phone, + contact_method="woo_checkout", + lead_type="checkout", + session_id=payload.session_id, + woo_order_id=str(payload.order_id), + status="success", # Immediate success because it's a sale + notification_status="pending", + ) + session.add(db_lead) + await commit_with_retry(session) + leads_created_total.labels(type="conversion", status="success").inc() + + # Form message + items_lines: list[str] = [] + for item in payload.items: + sku_info = f" (Арт: {item.sku})" if item.sku else "" + items_lines.append( + f"- {item.name}{sku_info} (x{item.quantity}) - {item.total} {payload.currency}" + ) + items_str = "\n".join(items_lines) + message = ALERT_WOO_ORDER.format( + order_id=payload.order_id, + first_name=payload.first_name, + last_name=payload.last_name, + phone=payload.phone, + total=payload.total, + currency=payload.currency, + items_str=items_str, + session_id=payload.session_id, + ) + + alert_success = await self.telegram_service.send_alert( + message, alert_type="conversion", session_id=payload.session_id + ) + + if db_lead: + db_lead.notification_status = "sent" if alert_success else "failed" # type: ignore + await commit_with_retry(session) + except Exception: + logger.exception("Error processing woo order webhook") + + async def create_chat_lead(self, phone_number: str, lead_type: str, session_id: str) -> int: + """Create a simple chat lead.""" + async with AsyncSessionLocal() as session: + db_lead = Lead( + name="Клієнт з чату", + phone_number=phone_number, + contact_method="chat", + lead_type=lead_type, + session_id=session_id, + status="new", + notification_status="pending", + ) + session.add(db_lead) + await commit_with_retry(session) + await session.refresh(db_lead) + return int(db_lead.id) # type: ignore + + async def update_lead_notification_status(self, lead_id: int, status: str) -> None: + """Update notification status.""" + async with AsyncSessionLocal() as session: + lead = await session.get(Lead, lead_id) + if lead: + lead.notification_status = status # type: ignore + await commit_with_retry(session) diff --git a/app/services/notification_builder.py b/app/services/notification_builder.py new file mode 100644 index 0000000..39f9e06 --- /dev/null +++ b/app/services/notification_builder.py @@ -0,0 +1,29 @@ +import html + +from app.core.constants import ALERT_MARGIN_ISSUE, ALERT_SCRAPER_FAILED + + +class NotificationBuilder: + """Builds formatted notification messages for Telegram alerts.""" + + @staticmethod + def build_margin_alert( + product_name: str | None, + woo_price: float | None, + supplier_price: float | None, + diff_woo: float | None, + margin_threshold: float, + ) -> str: + safe_product_name = html.escape(str(product_name)) if product_name else "Unknown Product" + return ALERT_MARGIN_ISSUE.format( + safe_product_name=safe_product_name, + woo_price=woo_price, + supplier_price=supplier_price, + diff_woo=diff_woo, + margin_threshold=margin_threshold, + ) + + @staticmethod + def build_scraper_failed_alert(product_name: str | None) -> str: + safe_product_name = html.escape(str(product_name)) if product_name else "Unknown Product" + return ALERT_SCRAPER_FAILED.format(safe_product_name=safe_product_name) diff --git a/app/services/openai_service.py b/app/services/openai_service.py index 8f9db7c..991cd25 100644 --- a/app/services/openai_service.py +++ b/app/services/openai_service.py @@ -18,13 +18,7 @@ logger = get_logger(__name__) -# Prompt and completion cost per 1M tokens in USD -MODEL_PRICING: dict[str, tuple[float, float]] = { - "gpt-4o": (5.0, 15.0), - "gpt-4o-mini": (0.150, 0.600), - "text-embedding-3-small": (0.02, 0.0), - "text-embedding-3-large": (0.13, 0.0), -} +logger = get_logger(__name__) class OpenAIService: @@ -37,11 +31,20 @@ def __init__(self, settings: Settings) -> None: self.embedding_model = settings.embedding_model self.openai_model = settings.openai_model self.max_tokens_response = settings.max_tokens_response + self.model_pricing = settings.model_pricing def _calculate_cost(self, model: str, prompt_tokens: int, completion_tokens: int) -> float: - prices = MODEL_PRICING.get(model, (0.0, 0.0)) + prices = self.model_pricing.get(model, (0.0, 0.0)) return (prompt_tokens * prices[0] / 1000000) + (completion_tokens * prices[1] / 1000000) + def _record_metrics(self, model: str, prompt_tokens: int, completion_tokens: int = 0) -> None: + llm_tokens_total.labels(model=model, token_type="prompt").inc(prompt_tokens) # noqa: S106 + if completion_tokens > 0: + llm_tokens_total.labels(model=model, token_type="completion").inc(completion_tokens) # noqa: S106 + cost = self._calculate_cost(model, prompt_tokens, completion_tokens) + if cost > 0: + llm_cost_usd_total.labels(model=model).inc(cost) + @retry( wait=wait_exponential(multiplier=1, min=2, max=10), stop=stop_after_attempt(3), @@ -64,11 +67,7 @@ async def generate_embedding(self, text: str) -> list[float]: llm_requests_total.labels(model=self.embedding_model, status="success").inc() if response.usage: - tokens = response.usage.prompt_tokens - llm_tokens_total.labels(model=self.embedding_model, token_type="prompt").inc(tokens) # noqa: S106 - cost = self._calculate_cost(self.embedding_model, tokens, 0) - if cost > 0: - llm_cost_usd_total.labels(model=self.embedding_model).inc(cost) + self._record_metrics(self.embedding_model, response.usage.prompt_tokens, 0) return response.data[0].embedding except RateLimitError: @@ -104,11 +103,7 @@ async def generate_embeddings_batch(self, texts: Sequence[str]) -> list[list[flo llm_requests_total.labels(model=self.embedding_model, status="success").inc() if response.usage: - tokens = response.usage.prompt_tokens - llm_tokens_total.labels(model=self.embedding_model, token_type="prompt").inc(tokens) # noqa: S106 - cost = self._calculate_cost(self.embedding_model, tokens, 0) - if cost > 0: - llm_cost_usd_total.labels(model=self.embedding_model).inc(cost) + self._record_metrics(self.embedding_model, response.usage.prompt_tokens, 0) sorted_data = sorted(response.data, key=lambda x: x.index) return [item.embedding for item in sorted_data] @@ -168,17 +163,12 @@ async def get_chat_completion( llm_requests_total.labels(model=self.openai_model, status="success").inc() if response.usage: - p_tokens = response.usage.prompt_tokens - c_tokens = response.usage.completion_tokens - llm_tokens_total.labels(model=self.openai_model, token_type="prompt").inc(p_tokens) # noqa: S106 - llm_tokens_total.labels(model=self.openai_model, token_type="completion").inc( # noqa: S106 - c_tokens + self._record_metrics( + self.openai_model, + response.usage.prompt_tokens, + response.usage.completion_tokens, ) - cost = self._calculate_cost(self.openai_model, p_tokens, c_tokens) - if cost > 0: - llm_cost_usd_total.labels(model=self.openai_model).inc(cost) - content = response.choices[0].message.content return content if content else "" except RateLimitError: diff --git a/app/services/rag_engine.py b/app/services/rag_engine.py index 685d95b..b15a971 100644 --- a/app/services/rag_engine.py +++ b/app/services/rag_engine.py @@ -15,10 +15,10 @@ BTN_SUCCESS_SHORT, CHAT_HISTORY_MARKER, CHAT_QUERY_MARKER, + INSTR_NO_PHONE_IN_CHAT, INTENT_CHECKOUT, INTENT_CONTACT, INTENT_FAQ, - INTENT_GENERAL, INTENT_HYBRID, INTENT_ORDER_STATUS, INTENT_PRODUCT, @@ -31,14 +31,10 @@ MSG_STREAM_FAILED, REGEX_CLEAN_QUERY, REGEX_PHONE, - REGEX_PRODUCT_NAME_HISTORY, ) -from app.core.db import AsyncSessionLocal, commit_with_retry from app.core.logging_config import get_logger -from app.models.lead import Lead from app.schemas.chat import ( IntentContextResult, - IntentDetectionResult, LinkItem, PipelineContext, RAGResponse, @@ -57,7 +53,6 @@ from app.services.vector_service import VectorService from app.utils.prompts import ( INSTR_CONTACT_MANAGER, - INTENT_ANALYZER_PROMPT, NO_CONTEXT_RESPONSE, RAG_SYSTEM_PROMPT, ) @@ -82,6 +77,11 @@ def __init__( guardrails_service: GuardrailsService, chat_memory_service: ChatMemoryService, settings: Settings, + lead_service: Any = None, + intent_detection_service: Any = None, + product_intent_handler: Any = None, + search_intent_handler: Any = None, + order_status_intent_handler: Any = None, ) -> None: self.openai_service = openai_service self.vector_service = vector_service @@ -91,104 +91,27 @@ def __init__( self.guardrails_service = guardrails_service self.chat_memory_service = chat_memory_service self.settings = settings - self.top_k = settings.top_k_results - self.threshold = settings.similarity_threshold - self.product_intent_handler = ProductCheckoutIntentHandler( - price_comparator=price_comparator, telegram_service=telegram_service, settings=settings - ) - self.search_intent_handler = SearchIntentHandler(price_comparator=price_comparator) - self.order_status_intent_handler = OrderStatusIntentHandler( - woo_service=price_comparator.woo_service - ) - - async def detect_intent(self, question: str, history_context: str) -> dict[str, Any]: - """ - Detects user intent using an LLM based on the conversation history and current question. + from app.services.lead_service import LeadService - Args: - question: The user's query. - history_context: String representation of the chat history. + self.lead_service = lead_service or LeadService(telegram_service, chat_memory_service) - Returns: - Dictionary containing 'intent', 'product_name', 'strict_query', - 'broad_query', 'category_query', and 'normalized_faq_queries'. - """ - stripped_q = question.strip() + from app.services.intent_detection_service import IntentDetectionService - # Regex checks for product codes - sku_match = re.search(r"\b\d{6}\b", stripped_q) - pn_match = re.search( - r"\b(?=[a-zA-Z0-9\-\.]*[a-zA-Z])(?=[a-zA-Z0-9\-\.]*\d)[a-zA-Z0-9\-\.]{5,25}\b", - stripped_q, + self.intent_detection_service = intent_detection_service or IntentDetectionService( + openai_service, category_manager ) - extracted_code = None - if sku_match: - extracted_code = sku_match.group(0) - elif pn_match: - extracted_code = pn_match.group(0) - - faq_triggers = [ - "доставка", - "оплата", - "гаранті", - "купити", - "замовити", - "наявн", - "скільки", - "як ", - ] - needs_llm = any(trigger in stripped_q.lower() for trigger in faq_triggers) - - # Fast-path for pure part numbers/SKUs - if extracted_code and not needs_llm and len(stripped_q) < 100: - return { - "intent": INTENT_SEARCH, - "product_name": None, - "strict_query": extracted_code, - "broad_query": stripped_q, - "category_query": None, - "normalized_faq_queries": [], - } - - categories_str = await self.category_manager.get_categories_string() - - prompt = INTENT_ANALYZER_PROMPT.format( - history_context=history_context if history_context else "None", - question=question, - categories_str=categories_str, - intent_product=INTENT_PRODUCT, - intent_search=INTENT_SEARCH, - intent_checkout=INTENT_CHECKOUT, - intent_hybrid=INTENT_HYBRID, - intent_general=INTENT_GENERAL, - intent_contact=INTENT_CONTACT, - intent_order_status=INTENT_ORDER_STATUS, + self.top_k = settings.top_k_results + self.threshold = settings.similarity_threshold + self.product_intent_handler = product_intent_handler or ProductCheckoutIntentHandler( + price_comparator=price_comparator, telegram_service=telegram_service, settings=settings + ) + self.search_intent_handler = search_intent_handler or SearchIntentHandler( + price_comparator=price_comparator + ) + self.order_status_intent_handler = order_status_intent_handler or OrderStatusIntentHandler( + woo_service=price_comparator.woo_service ) - - try: - response = await self.openai_service.get_chat_completion( - system_prompt="You are a system JSON analyzer. Ensure you respond with valid JSON.", - user_message=prompt, - context_chunks=[], - response_format={"type": "json_object"}, - ) - cleaned = response.replace("```json", "").replace("```", "").strip() - parsed_json = IntentDetectionResult.model_validate_json(cleaned).model_dump() - - # Fallback override - if extracted_code and parsed_json.get("intent") in (INTENT_GENERAL, INTENT_FAQ): - parsed_json["intent"] = INTENT_SEARCH - parsed_json["strict_query"] = extracted_code - parsed_json["broad_query"] = stripped_q - - return parsed_json - except json.JSONDecodeError: - logger.exception("Intent detection JSON parsing failed") - return {"intent": INTENT_FAQ, "normalized_faq_queries": [question]} - except Exception: - logger.exception("Intent detection generic error") - return {"intent": INTENT_FAQ, "normalized_faq_queries": [question]} async def _retrieve_context( self, search_query: str, precomputed_vector: list[float] | None = None @@ -242,24 +165,6 @@ async def _get_intent_context( intent_type = intent_data.get("intent", INTENT_FAQ) product_name = intent_data.get("product_name") - # Context recovery from history - if (not product_name or product_name == "FOUND_NAME_FROM_HISTORY") and intent_type in [ - INTENT_PRODUCT, - INTENT_CHECKOUT, - INTENT_HYBRID, - ]: - for msg in reversed(history): - content = msg.get("content", "") - if msg.get("role") == "bot" and "Товар" in content: - match = re.search(REGEX_PRODUCT_NAME_HISTORY, content) - if match: - product_name = match.group(1) - logger.info( - "Recovered product name from bot history", - extra={"product_name": product_name}, - ) - break - category_term = str(intent_data.get("category_query") or "").strip() category_id = ( await self.category_manager.get_category_id(category_term) if category_term else None @@ -362,28 +267,21 @@ async def _try_capture_lead( lead_type = "price_clarification" if is_price_clarification else "contact" try: - async with AsyncSessionLocal() as session: - db_lead = Lead( - name="Клієнт з чату", - phone_number=phone_number, - contact_method="chat", - lead_type=lead_type, - session_id=session_id, - status="new", - notification_status="pending", - ) - session.add(db_lead) - await commit_with_retry(session) - await session.refresh(db_lead) - lead_id = int(db_lead.id) # type: ignore + lead_id = await self.lead_service.create_chat_lead( + phone_number=phone_number, lead_type=lead_type, session_id=session_id + ) + + import html + + safe_question = html.escape(question) if is_price_clarification: message = ALERT_PRICE_CLARIFICATION.format( - lead_id=lead_id, phone=phone_number, question=question + lead_id=lead_id, phone=phone_number, question=safe_question ) else: message = ALERT_CHAT_LEAD.format( - lead_id=lead_id, phone=phone_number, question=question + lead_id=lead_id, phone=phone_number, question=safe_question ) # Safe button formatting: avoid AttributeError and typing errors @@ -419,11 +317,9 @@ async def _try_capture_lead( message, alert_type="lead", session_id=session_id, reply_markup=reply_markup ) - async with AsyncSessionLocal() as session: - lead = await session.get(Lead, lead_id) - if lead: - lead.notification_status = "sent" if lead_success else "failed" # type: ignore - await commit_with_retry(session) + await self.lead_service.update_lead_notification_status( + lead_id=lead_id, status="sent" if lead_success else "failed" + ) msg = MSG_LEAD_SUCCESS if lead_success else MSG_LEAD_FAILED await self.chat_memory_service.add_interaction(session_id, question, msg) @@ -480,6 +376,9 @@ async def _prepare_rag_pipeline( if is_lead and lead_ctx: return lead_ctx + session_state = await self.chat_memory_service.get_session_state(session_id) + session_state_json = json.dumps(session_state, ensure_ascii=False) + history = await self.chat_memory_service.get_history(session_id, limit=6) history_context = "\n".join( [ @@ -504,18 +403,25 @@ async def _prepare_rag_pipeline( if faq_results and faq_results[0].get("metadata", {}).get("source") == "store_policy": intent_data = {"intent": INTENT_FAQ, "normalized_faq_queries": [question]} else: - intent_data = await self.detect_intent(question, history_context) + intent_data = await self.intent_detection_service.detect_intent( + question, history_context, session_state_json + ) intent_type = intent_data.get("intent", INTENT_FAQ) + # Fallback for missing product name if intent analyzer failed to extract it + if not intent_data.get("product_name") and intent_type in [ + INTENT_PRODUCT, + INTENT_HYBRID, + INTENT_CHECKOUT, + ]: + last_prods = cast(list[str], session_state.get("last_products", [])) + if last_prods: + intent_data["product_name"] = last_prods[-1] + if intent_type in [INTENT_PRODUCT, INTENT_SEARCH, INTENT_CHECKOUT]: intent_data["normalized_faq_queries"] = [] - if intent_type == INTENT_SEARCH: - await self.chat_memory_service.reset_rag_context(session_id) - history_context = "" - history = [] - raw_queries = intent_data.get("normalized_faq_queries", []) valid_queries: list[str] = [] if isinstance(raw_queries, str): @@ -530,6 +436,18 @@ async def _prepare_rag_pipeline( vector_tasks = [] if valid_queries: + # First generate all embeddings in one batch call + queries_to_embed = [q for q in valid_queries if q not in embedding_cache] + if queries_to_embed: + try: + batch_embeddings = await self.openai_service.generate_embeddings_batch( + queries_to_embed + ) + for q, emb in zip(queries_to_embed, batch_embeddings, strict=False): + embedding_cache[q] = emb + except Exception as e: + logger.warning("Batch embedding failed", error=str(e)) + vector_tasks = [ self._retrieve_context(nq, precomputed_vector=embedding_cache.get(nq)) for nq in valid_queries @@ -579,7 +497,20 @@ async def fetch_vectors() -> list[ requires_lead = intent_results.requires_lead lead_form_type = intent_results.lead_form_type + if intent_results.viewed_products or intent_type == INTENT_SEARCH: + search_term = str( + intent_data.get("strict_query") or intent_data.get("broad_query") or "" + ) + await self.chat_memory_service.update_session_state( + session_id, + last_search_query=search_term, + last_products=intent_results.viewed_products, + ) + prepended_context: list[str] = [] + if requires_lead: + system_instructions.append(INSTR_NO_PHONE_IN_CHAT) + if system_instructions: prepended_context.append("\n".join(system_instructions)) if product_facts: @@ -640,6 +571,20 @@ async def process_query( lead_form_type=None, ) + if ctx.lead_form_type == "checkout": + hardcoded_msg = "Чудово! Товар доступний до замовлення.\n\nЩоб безпечно оформити покупку, обрати спосіб доставки та оплати — натисніть кнопку **Оформити замовлення** нижче.\n\nТакож ви можете заповнити форму контактів або написати нашому менеджеру в Telegram/Viber для уточнення деталей." + await self.chat_memory_service.add_interaction(session_id, question, hardcoded_msg) + return RAGResponse.model_validate( + { + "answer": hardcoded_msg, + "sources": ctx.sources, + "has_context": True, + "links": ctx.extracted_links, + "requires_lead": ctx.requires_lead, + "lead_form_type": ctx.lead_form_type, + } + ) + try: answer = await self.openai_service.get_chat_completion( system_prompt=RAG_SYSTEM_PROMPT, @@ -707,6 +652,28 @@ async def process_query_stream( ) yield f"[METADATA] {meta_payload}" + if ctx.lead_form_type == "checkout": + hardcoded_msg = "Чудово! Товар доступний до замовлення.\n\nЩоб безпечно оформити покупку, обрати спосіб доставки та оплати — натисніть кнопку **Оформити замовлення** нижче.\n\nТакож ви можете заповнити форму контактів або написати нашому менеджеру в Telegram/Viber для уточнення деталей." + yield json.dumps({"token": hardcoded_msg}, ensure_ascii=False) + + links_metadata = "" + if ctx.extracted_links: + for link in ctx.extracted_links: + if link.get("url"): + links_metadata += f" " + + bg_tasks: set[asyncio.Task[Any]] = getattr(self, "_bg_tasks", set()) + if not hasattr(self, "_bg_tasks"): + self._bg_tasks = bg_tasks + t_checkout = asyncio.create_task( + self.chat_memory_service.add_interaction( + session_id, question, hardcoded_msg + links_metadata + ) + ) + bg_tasks.add(t_checkout) + t_checkout.add_done_callback(bg_tasks.discard) + return + response_tokens: list[str] = [] stream: AsyncGenerator[str] | None = None full_response = "" diff --git a/app/services/statistics_service.py b/app/services/statistics_service.py index 588b74d..621ac62 100644 --- a/app/services/statistics_service.py +++ b/app/services/statistics_service.py @@ -1,7 +1,6 @@ import asyncio import logging from datetime import UTC, datetime, timedelta -from typing import Any import httpx from sqlalchemy import func, select @@ -83,13 +82,15 @@ async def gather_and_send_daily_report_job() -> None: woo_service = WooService(settings) # 1. Prometheus Metrics - prom_stats = await fetch_prometheus_data(settings) + prom_task = asyncio.create_task(fetch_prometheus_data(settings)) # 2. SQLite (Unique users) - unique_users = await fetch_unique_users_24h() + users_task = asyncio.create_task(fetch_unique_users_24h()) # 3. WooCommerce - woo_stats: dict[str, Any] = await woo_service.get_daily_orders_stats() + woo_task = asyncio.create_task(woo_service.get_daily_orders_stats()) + + prom_stats, unique_users, woo_stats = await asyncio.gather(prom_task, users_task, woo_task) # Message formatting date_str = datetime.now(UTC).strftime("%d.%m.%Y") @@ -130,5 +131,8 @@ async def gather_and_send_daily_report_job() -> None: message = "\n".join(msg_lines) # Send to 'stat' topic - await telegram_service.send_alert(message, alert_type="stat") - logger.info("Daily statistics report sent.") + try: + await telegram_service.send_alert(message, alert_type="stat") + logger.info("Daily statistics report sent.") + finally: + await woo_service.close() diff --git a/app/services/telegram_service.py b/app/services/telegram_service.py index 57c0503..f022ab0 100644 --- a/app/services/telegram_service.py +++ b/app/services/telegram_service.py @@ -74,10 +74,9 @@ async def _make_request( f"Failed TG request to {endpoint} (Attempt {attempt + 1}/{retries})", response=resp.text, ) - except Exception as e: - logger.error( - f"Error in TG request to {endpoint} (Attempt {attempt + 1}/{retries})", - error=str(e), + except Exception: + logger.exception( + f"Error in TG request to {endpoint} (Attempt {attempt + 1}/{retries})" ) if attempt < retries - 1: @@ -213,3 +212,12 @@ async def send_document( resp = await self._make_request("sendDocument", data=data, files=files, retries=3) return resp is not None + + async def answer_callback_query(self, callback_query_id: str, text: str = "") -> bool: + """Answer a Telegram callback query.""" + if not self.api_base: + return False + + payload: dict[str, Any] = {"callback_query_id": callback_query_id, "text": text} + resp = await self._make_request("answerCallbackQuery", payload=payload, retries=3) + return resp is not None diff --git a/app/services/vector_service.py b/app/services/vector_service.py index 88a383c..2308266 100644 --- a/app/services/vector_service.py +++ b/app/services/vector_service.py @@ -25,11 +25,12 @@ def __init__(self, settings: Settings) -> None: self.dimension = settings.pinecone_dimension self.metric = settings.pinecone_metric - self.index: Any = self.pc.Index(self.index_name) # type: ignore[reportUnknownMemberType] + self.index: Any = None async def initialize(self) -> None: """Initialize the vector service by ensuring the index exists.""" await asyncio.to_thread(self.ensure_index_exists) + self.index = self.pc.Index(self.index_name) # type: ignore[reportUnknownMemberType] def ensure_index_exists(self) -> None: """ diff --git a/app/services/woo_service.py b/app/services/woo_service.py index 2b32558..a75d3e1 100644 --- a/app/services/woo_service.py +++ b/app/services/woo_service.py @@ -11,16 +11,6 @@ logger = get_logger(__name__) -_global_client: httpx.AsyncClient | None = None - - -async def close_woo_client() -> None: - """Close the global WooCommerce HTTP client.""" - global _global_client - if _global_client is not None: - await _global_client.aclose() - _global_client = None - class WooService: def __init__(self, settings: Settings) -> None: @@ -30,12 +20,11 @@ def __init__(self, settings: Settings) -> None: self.base_url = f"{settings.woo_url.rstrip('/')}/wp-json/wc/v3/products" # Increased timeout due to slow WooCommerce search self.timeout = httpx.Timeout(15.0, connect=3.0) + self.client = httpx.AsyncClient() - def _get_client(self) -> httpx.AsyncClient: - global _global_client - if _global_client is None: - _global_client = httpx.AsyncClient() - return _global_client + async def close(self) -> None: + """Close the internal HTTP client.""" + await self.client.aclose() def _parse_product_list(self, data: list[dict[str, Any]]) -> list[WooProduct]: """Parse a list of raw WooCommerce products into a list of WooProduct.""" @@ -59,7 +48,7 @@ def _parse_product_list(self, data: list[dict[str, Any]]) -> list[WooProduct]: async def _fetch_and_parse_single(self, params: dict[str, str | int]) -> WooProduct | None: """Fetch and parse a single product from WooCommerce.""" - client = self._get_client() + client = self.client try: resp = await client.get( self.base_url, params=params, auth=(self.woo_ck, self.woo_cs), timeout=self.timeout @@ -88,7 +77,7 @@ async def _fetch_and_parse_single(self, params: dict[str, str | int]) -> WooProd async def _fetch_products_list(self, params: dict[str, Any], context: str) -> list[WooProduct]: """Helper to fetch and parse a list of products.""" - client = self._get_client() + client = self.client products: list[WooProduct] = [] try: resp = await client.get( @@ -174,7 +163,7 @@ async def get_daily_orders_stats(self) -> dict[str, Any]: tags: dict[str, int] = {} orders_url = self.base_url.replace("/products", "/orders") - client = self._get_client() + client = self.client try: resp = await client.get( @@ -240,7 +229,7 @@ async def get_order_async(self, order_id: int) -> dict[str, Any] | None: from app.services.woo_smart_parser import parse_order order_url = f"{self.base_url.replace('/products', '/orders')}/{order_id}" - client = self._get_client() + client = self.client try: resp = await client.get( diff --git a/app/services/woo_smart_parser.py b/app/services/woo_smart_parser.py index 761516f..3177697 100644 --- a/app/services/woo_smart_parser.py +++ b/app/services/woo_smart_parser.py @@ -93,7 +93,7 @@ def parse_product(raw_product: dict[str, Any] | None, max_desc_length: int = 400 clean_data["attributes"] = attributes # 2. Cleaning and truncating HTML short description - html_desc = product_input.short_description + html_desc = str(raw_product.get("short_description", "")) if html_desc: soup = BeautifulSoup(html_desc, "html.parser") diff --git a/app/utils/network.py b/app/utils/network.py index 084c78c..8b7bb29 100644 --- a/app/utils/network.py +++ b/app/utils/network.py @@ -4,11 +4,18 @@ def get_client_ip(request: Request) -> str: """ - Safely extract client IP. - Relies on request.client.host, which is correctly populated - if uvicorn is running with --proxy-headers and --forwarded-allow-ips. - If not, we use slowapi fallback. + Safely extract client IP behind Nginx. + Prioritizes X-Forwarded-For and X-Real-IP headers. + Falls back to request.client.host or slowapi fallback. """ + forwarded_for = request.headers.get("X-Forwarded-For") + if forwarded_for: + return forwarded_for.split(",")[0].strip() + + real_ip = request.headers.get("X-Real-IP") + if real_ip: + return real_ip.strip() + if request.client and request.client.host: return request.client.host return get_remote_address(request) diff --git a/app/utils/prompts.py b/app/utils/prompts.py index 53afc98..8b7352c 100644 --- a/app/utils/prompts.py +++ b/app/utils/prompts.py @@ -13,6 +13,7 @@ 6. УВАГА ДО УМОВ: Не змішуй умови доставки для товарів в наявності (1-3 дні) та під замовлення (14-20 днів). 7. КРИТИЧНО: ЗАБОРОНЕНО вставляти будь-які посилання (URL) безпосередньо у текст відповіді. Бекенд автоматично згенерує кнопки для клієнта. Твій текст має бути абсолютно чистим від лінків. 8. НІКОЛИ не пропонуй самостійно інші категорії товарів (наприклад, ноутбуки, телевізори тощо), якщо клієнт про них не питав. Якщо інформації немає, просто запитай "Чим ще я можу допомогти?". +9. Якщо клієнт просить характеристики або опис товару, ЗАВЖДИ надавай їх безпосередньо у чаті. Категорично заборонено відправляти клієнта шукати їх на сайті. """ NO_CONTEXT_RESPONSE = "На жаль, я не маю точної інформації з цього питання. Передаю ваш запит менеджеру, він незабаром зв'яжеться з вами." @@ -21,6 +22,9 @@ History: {history_context} +Session State (Recent Search/View tracking from DB): +{session_state_json} + Current Query: "{question}" Your task is to return a JSON with fields: "intent", "product_name", "strict_query", "broad_query", "category_query", "normalized_faq_queries" (must be an array of strings). @@ -28,12 +32,17 @@ RULES: 1. If the client specifies a SPECIFIC model name (e.g. "Acer CZ342CUR"): -> {{"intent": "{intent_product}", "product_name": "Exact Name", "strict_query": null, "broad_query": null, "category_query": "Choose EXACT category name from the list below. If unsure - null", "normalized_faq_queries": []}} -2. If the client provides price limits ("15000-20000"), asks for multiple items, or provides a general category with features ("wireless gaming mouse Logitech G Pro"): - -> {{"intent": "{intent_search}", "product_name": null, "strict_query": "full commercial query with adjectives", "broad_query": "MAX 1-3 most important words: only brand and basic model, or only category. The shorter the better!", "category_query": "Choose EXACT category name from the list below. If unsure - null", "normalized_faq_queries": []}} +2. If the client provides price limits, asks for multiple items, or provides a general category/brand with features: + -> If the current query is an incomplete thought (just a brand, feature, or adjective like "samsung", "with 144hz"), COMBINE it with the last_search_query from Session State or History to form the full query (e.g., "samsung monitor"). + -> If they ask for a completely NEW and different product type (e.g., they asked for a monitor before, but now ask for "телевізор"), IGNORE the history/state and use ONLY the new query. + -> {{"intent": "{intent_search}", "product_name": null, "strict_query": "full combined query", "broad_query": "short query", "category_query": "EXACT category name", "normalized_faq_queries": []}} 3. If the client uses pronouns ("this", "it") or asks a clarifying question EXCLUSIVELY about features (without specifying brand): - -> Find the last model in the History. Intent must be "{intent_product}". + -> ANALYZE the Session State (last_products) and History to find the EXACT product name the client is referring to. + -> {{"intent": "{intent_product}", "product_name": "EXACT PRODUCT NAME FROM SESSION STATE", "strict_query": null, "broad_query": null, "category_query": null, "normalized_faq_queries": []}} + -> If there are multiple products in last_products and it's ambiguous, output intent "{intent_general}" to ask for clarification. 4. If the client explicitly expresses a desire to BUY or PLACE AN ORDER: - -> {{"intent": "{intent_checkout}", "product_name": "FOUND_NAME_FROM_HISTORY", "strict_query": null, "broad_query": null, "category_query": null, "normalized_faq_queries": []}} + -> Find the exact product name from Session State or History. + -> {{"intent": "{intent_checkout}", "product_name": "EXACT PRODUCT NAME", "strict_query": null, "broad_query": null, "category_query": null, "normalized_faq_queries": []}} 5. CRITICAL (Hybrid): The presence of ANY question about delivery, payment, warranty, or installment plan FORCES the intent to "{intent_hybrid}". This is the only way to return FAQ queries. -> {{"intent": "{intent_hybrid}", "product_name": "model name (if any)", "strict_query": "commercial query (if any)", "broad_query": "short query (if any)", "category_query": "EXACT category name", "normalized_faq_queries": ["Extract the user's FAQ question and translate it into a short Ukrainian query (e.g., 'оплата частинами', 'умови доставки')"]}} 6. CRITICAL: For "category_query" choose the EXACT name from this list: [{categories_str}]. If nothing fits - null. @@ -50,9 +59,9 @@ # --- Internal Bot Instructions (System Prompts augmentations) --- INSTR_NO_DUPLICATE_LINKS = "ВКАЗІВКА: Посилання вже згенеровані системою. Не дублюй їх у тексті." INSTR_NOTHING_FOUND = "Інформація: за запитом нічого не знайдено. Запропонуй клієнту залишити номер телефону, щоб менеджер підібрав аналог." -INSTR_FALLBACK_CATEGORY = "Ти шукав конкретику, але бекенд знайшов товари лише за категорією. Адаптуй відповідь: якщо клієнт шукав КОНКРЕТНУ модель, ввічливо скажи, що її немає, але є альтернативи. Якщо запит клієнта був загальним (наприклад 'які є SSD'), просто радо презентуй знайдені товари без жодних вибачень." -INSTR_BROAD_SEARCH_FALLBACK = "The backend performed an extended search for the query '{broad_term}'. Adapt the response: if the client looked for a SPECIFIC model ('{strict_term}'), politely say it is missing, but there are alternatives. If general, just present." -INSTR_CHECKOUT_TELEGRAM = "Клієнт хоче купити '{product_name}'. ТВОЯ ЗАДАЧА:\nСкажи, що він може оформити замовлення самостійно (кнопка вже згенерована) АБО просто залишити номер телефону тут, і менеджер все оформить сам." +INSTR_FALLBACK_CATEGORY = "Бекенд не знайшов конкретної моделі, але знайшов товари за категорією. Ввічливо скажи, що шуканої моделі немає, але запропонуй альтернативи." +INSTR_BROAD_SEARCH_FALLBACK = "Бекенд виконав розширений пошук за запитом '{broad_term}', оскільки конкретна модель '{strict_term}' відсутня. Ввічливо скажи, що шуканої моделі немає, але є альтернативи." +INSTR_CHECKOUT_TELEGRAM = "Клієнт хоче купити '{product_name}'." INSTR_PRODUCT_FOUND = "Бекенд знайшов товар '{product_name}' за пошуковим запитом клієнта. Обов'язково презентуй його клієнту і не кажи, що інформації не знайдено." INSTR_PRODUCT_CONTEXT = "Бекенд нагадує: поточний товар у контексті розмови — '{product_name}'. Відповідай на запитання клієнта лаконічно, спираючись на ці дані. Не презентуй товар наново, якщо тебе про це не просили." INSTR_ALERT_FAILED = "УВАГА: Виникла технічна помилка зв'язку з менеджером (система сповіщень не працює). Вибачся перед клієнтом за тимчасові незручності і попроси його залишити свій номер телефону або Telegram/Viber прямо в чаті, щоб ми могли зв'язатися з ним як тільки систему буде відновлено." diff --git a/docker-compose.yml b/docker-compose.yml index 2492868..b801d9e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ services: build: context: . dockerfile: Dockerfile - container_name: bubblebrain-app + container_name: digitaldreams-app env_file: - .env ports: @@ -17,7 +17,7 @@ services: prometheus: image: prom/prometheus:latest - container_name: bubblebrain-prometheus + container_name: digitaldreams-prometheus command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.path=/prometheus' diff --git a/docs/Gemini_Generated_Image_64q1x664q1x664q1.png b/docs/Gemini_Generated_Image_64q1x664q1x664q1.png new file mode 100644 index 0000000..6f46653 Binary files /dev/null and b/docs/Gemini_Generated_Image_64q1x664q1x664q1.png differ diff --git a/docs/diagram-export-6-21-2026-4_56_21-PM.jpg b/docs/diagram-export-6-21-2026-4_56_21-PM.jpg new file mode 100644 index 0000000..3b36cc3 Binary files /dev/null and b/docs/diagram-export-6-21-2026-4_56_21-PM.jpg differ diff --git a/docs/diagram-export-6-21-2026-4_56_21-PM.png b/docs/diagram-export-6-21-2026-4_56_21-PM.png new file mode 100644 index 0000000..49433ef Binary files /dev/null and b/docs/diagram-export-6-21-2026-4_56_21-PM.png differ diff --git a/frontend/chat-widget.js b/frontend/chat-widget.js index 54645aa..cfcad36 100644 --- a/frontend/chat-widget.js +++ b/frontend/chat-widget.js @@ -55,7 +55,7 @@ class ChatWidget { await new Promise((resolve) => { styleLink.onload = resolve; styleLink.onerror = () => { - console.error("BubbleBrain: Failed to load chat-widget.css"); + console.error("digitaldreams: Failed to load chat-widget.css"); resolve(); // Fallback, щоб не заблокувати повністю }; this._shadow.appendChild(styleLink); @@ -144,7 +144,7 @@ class ChatWidget { toggleWindow() { if (!this._elements) { - console.warn("BubbleBrain Widget is not fully initialized yet."); + console.warn("digitaldreams Widget is not fully initialized yet."); return; } this._isOpen = !this._isOpen; @@ -322,7 +322,7 @@ class ChatWidget { } } catch (e) { console.error( - "BubbleBrain Stream: Failed to parse chunk", + "digitaldreams Stream: Failed to parse chunk", e, data, ); @@ -336,7 +336,7 @@ class ChatWidget { } } } catch (error) { - console.error("BubbleBrain Stream Error:", error); // ВІДСТЕЖЕННЯ РЕАЛЬНОГО ЗБОЮ + console.error("digitaldreams Stream Error:", error); // ВІДСТЕЖЕННЯ РЕАЛЬНОГО ЗБОЮ if (error.name === "AbortError") { throw new Error("Timeout запиту."); } diff --git a/frontend/embed.js b/frontend/embed.js index f91861b..1058171 100644 --- a/frontend/embed.js +++ b/frontend/embed.js @@ -16,22 +16,23 @@ if (!config.apiHost || !config.apiKey) { console.error( - "BubbleBrain Widget: Missing required configuration (apiHost, apiKey)", + "digitaldreams Widget: Missing required configuration (apiHost, apiKey)", ); return; } // Додаємо скрипт з логікою, якщо він ще не завантажений if (typeof window.ChatWidget === "undefined") { - const scriptUrl = new URL( - "chat-widget.js", - currentScript ? currentScript.src : window.location.href, - ).href + "?v=2"; + const scriptUrl = + new URL( + "chat-widget.js", + currentScript ? currentScript.src : window.location.href, + ).href + "?v=2"; const script = document.createElement("script"); script.src = scriptUrl; script.onload = () => { if (window.ChatWidgetInstance) { - console.warn("BubbleBrain Widget is already initialized."); + console.warn("digitaldreams Widget is already initialized."); return; } window.ChatWidgetInstance = new window.ChatWidget(config); @@ -39,7 +40,7 @@ document.head.appendChild(script); } else { if (window.ChatWidgetInstance) { - console.warn("BubbleBrain Widget is already initialized."); + console.warn("digitaldreams Widget is already initialized."); return; } window.ChatWidgetInstance = new window.ChatWidget(config); diff --git a/frontend/widget.html b/frontend/widget.html index fc11d68..93a27bd 100644 --- a/frontend/widget.html +++ b/frontend/widget.html @@ -3,7 +3,7 @@ - BubbleBrain AI Widget Demo + digitaldreams AI Widget Demo