0% encontró este documento útil (0 votos)
7 vistas60 páginas

Codigo Base Agente Mejorado v3

El documento describe la implementación de un sistema de gestión de memoria para un asistente farmacéutico, que incluye la gestión de errores tipográficos y reglas de interacción. Utiliza bibliotecas como pandas y faiss para el manejo de datos y la búsqueda de similitudes, y almacena información en formatos como Parquet y JSON. Además, se definen constantes, funciones de normalización de texto y estructuras de datos para registrar eventos y respuestas del asistente.

Cargado por

corporationqiu86
Derechos de autor
© © All Rights Reserved
Nos tomamos en serio los derechos de los contenidos. Si sospechas que se trata de tu contenido, reclámalo aquí.
Formatos disponibles
Descarga como DOCX, PDF, TXT o lee en línea desde Scribd
0% encontró este documento útil (0 votos)
7 vistas60 páginas

Codigo Base Agente Mejorado v3

El documento describe la implementación de un sistema de gestión de memoria para un asistente farmacéutico, que incluye la gestión de errores tipográficos y reglas de interacción. Utiliza bibliotecas como pandas y faiss para el manejo de datos y la búsqueda de similitudes, y almacena información en formatos como Parquet y JSON. Además, se definen constantes, funciones de normalización de texto y estructuras de datos para registrar eventos y respuestas del asistente.

Cargado por

corporationqiu86
Derechos de autor
© © All Rights Reserved
Nos tomamos en serio los derechos de los contenidos. Si sospechas que se trata de tu contenido, reclámalo aquí.
Formatos disponibles
Descarga como DOCX, PDF, TXT o lee en línea desde Scribd

import logging

import pandas as pd

import numpy as np

import json

import os

import faiss

import re

import time

import unicodedata

from typing import Dict, List, Optional, Tuple, Any

from concurrent.futures import ThreadPoolExecutor

from sentence_transformers import SentenceTransformer

from rapidfuzz import fuzz, process

from symspellpy import SymSpell

from llama_cpp import Llama

from datetime import datetime

import hashlib

import threading

# ===================== CONSTANTES & HELPERS


=====================

CODE_COLS = {"codigosap"}

ACRONYM_RE = re.compile(r"\b[A-ZÁÉÍÓÚÑ]{2,}\b")

LONG_NUM_RE = re.compile(r"\b\d{6,}\b")

APRENDIZAJE_DIR = r"D:\Proyecto_Smart\aprendizaje_ac\
asistente_productos"

os.makedirs(APRENDIZAJE_DIR, exist_ok=True)

# Parquets
PQT_EVENTS = os.path.join(APRENDIZAJE_DIR, "events_raw.parquet")

PQT_RULES = os.path.join(APRENDIZAJE_DIR,
"memory_rules.parquet")

PQT_FT = os.path.join(APRENDIZAJE_DIR, "dataset_ft.parquet")

PQT_TYPO = os.path.join(APRENDIZAJE_DIR, "typo_map.parquet")

# Legacy JSON (migración opcional)

JSON_VALID = os.path.join(APRENDIZAJE_DIR,
"preguntas_validadas.json")

JSON_ERR = os.path.join(APRENDIZAJE_DIR,
"preguntas_erradas.json")

JSON_CORR = os.path.join(APRENDIZAJE_DIR,
"preguntas_corregidas.json")

SCHEMA_VERSION = 1

TEMPLATE_ID_DV1 = "Dv1"

HUMANIZE = {

"molecula": "Molécula",

"marca": "Marca",

"formafarmaceutica": "Forma farmacéutica",

"concentracion": "Concentración",

"areadefabricacion": "Área de fabricación",

"fabricacion": "Origen de fabricación",

"tipodemolecula": "Tipo de molécula",

"tipodeproducto": "Tipo de producto",

"presentacionesdeproducto": "Presentación (comercial)",

"codigosap": "Código SAP",

"presentacionesproductosap": "Descripción SAP",

"cantidadenunidades": "Cantidad en unidades",


"gerente": "Gerente",

"lineacomercial": "Línea comercial",

"grupodetrabajo": "Grupo de trabajo"

# Nuevos prompts más naturales

PROMPT_NLG = """

Eres un experto farmacéutico que responde preguntas sobre


productos de manera profesional pero amigable.

Genera una respuesta natural en español basada en los datos


proporcionados, siguiendo estas pautas:

1. Comienza reconociendo la pregunta del usuario de manera natural

2. Integra la información en frases completas y fluidas

3. Destaca los datos más relevantes para la pregunta

4. Agrupa información relacionada de manera lógica

5. Evita enumeraciones como en una tabla

6. Termina con una pregunta relevante para continuar la conversación

Ejemplo de estilo:

"Entiendo que buscas información sobre [producto]. Este


medicamento contiene [molécula] y está disponible en
[presentación]. Pertenece a la línea [línea comercial]. ¿Necesitas
algún detalle adicional?"

Datos encontrados:

{resultados_json}

Pregunta del usuario:

{pregunta}

"""
PROMPT_RETRY = "No encontré la información exacta que buscas,
pero intenté con otros criterios. Esto es lo que hallé:\n\n{respuesta}"

PROMPT_SUGERENCIA = ("Parece que no encontré coincidencias


exactas. ¿Podrías ayudarme a entender mejor?\n"

"Por ejemplo, puedes especificar si buscas por:\n"

"- Nombre comercial del producto\n"

"- Marca\n"

"- Molécula activa\n"

"- Código SAP\n\n"

"O reformula tu pregunta con más detalles.")

def _norm_txt(s: str) -> str:

if not isinstance(s, str):

return ""

s = unicodedata.normalize("NFKD", s).encode("ascii",
"ignore").decode("ascii")

return re.sub(r"\s+", " ", s.strip().lower())

def _make_hash(*parts: str) -> str:

h = hashlib.sha256()

for p in parts:

h.update((p or "").encode("utf-8"))

return h.hexdigest()

def _safe_write_parquet(df: pd.DataFrame, path: str):

tmp = path + ".tmp"

df.to_parquet(tmp, index=False)

os.replace(tmp, path)
def _read_parquet_or_empty(path: str, cols: List[str]) ->
pd.DataFrame:

if os.path.exists(path):

return pd.read_parquet(path)

return pd.DataFrame(columns=cols)

# ===================== TYPO MEMORY


=====================

class TypoMemory:

_lock = threading.Lock()

def __init__(self):

self.cols = ["typo","correction","count","last_seen"]

self.df = _read_parquet_or_empty(PQT_TYPO, self.cols)

def get(self, typo: str) -> Optional[str]:

m = self.df["typo"] == typo

if m.any():

return self.df[m].iloc[-1]["correction"]

return None

def upsert(self, typo: str, correction: str):

ts = datetime.now().isoformat(timespec='seconds')

with self._lock:

m = (self.df["typo"]==typo) &
(self.df["correction"]==correction)

if m.any():

idx = self.df[m].index[0]

self.df.loc[idx, "count"] = (self.df.loc[idx, "count"] or 0) + 1

self.df.loc[idx, "last_seen"] = ts

else:
self.df.loc[len(self.df)] =
{"typo":typo,"correction":correction,"count":1,"last_seen":ts}

_safe_write_parquet(self.df, PQT_TYPO)

# ===================== RULE MEMORY


=====================

class RuleMemory:

_lock = threading.Lock()

def __init__(self, embedder: SentenceTransformer):

self.embedder = embedder

self.columns = [

"canonical_q_id","q_text_canonica","q_embed_json",

"field_hints_json","expected_answer","dont_say_json",

"last_update","uses_count","quality_score_rule","schema_version"

self.df = _read_parquet_or_empty(PQT_RULES, self.columns)

self._refresh_matrix()

def _refresh_matrix(self):

if self.df.empty:

self._emb_matrix = np.zeros((0,1), dtype=np.float32)

else:

embeds = self.df["q_embed_json"].fillna("[]").map(lambda x:
np.array(json.loads(x), dtype=np.float32))

self._emb_matrix = np.vstack(embeds.values) if
len(embeds)>0 else np.zeros((0,1),dtype=np.float32)

def _find_similar(self, q_embed: np.ndarray, thresh: float=0.90) ->


Optional[int]:

if self._emb_matrix.shape[0] == 0:
return None

sims = self._emb_matrix @ q_embed /


(np.linalg.norm(self._emb_matrix, axis=1)*np.linalg.norm(q_embed)
+1e-9)

idx = int(np.argmax(sims))

return idx if sims[idx] >= thresh else None

def get_rule(self, query: str, thresh: float=0.90) -> Optional[Dict]:

q_embed = self.embedder.encode([query]).astype(np.float32)[0]

idx = self._find_similar(q_embed, thresh)

if idx is None:

return None

row = self.df.iloc[idx]

return {

"canonical_q_id": row["canonical_q_id"],

"q_text_canonica": row["q_text_canonica"],

"field_hints": json.loads(row["field_hints_json"] or "[]"),

"expected_answer": row["expected_answer"] or "",

"dont_say": json.loads(row["dont_say_json"] or "[]"),

"quality_score_rule": row["quality_score_rule"] or 1.0

def upsert_rule(self, pregunta: str, field_hints: List[str],


expected_answer: str,

quality: float=1.0):

q_embed = self.embedder.encode([pregunta]).astype(np.float32)
[0]

with self._lock:

idx = self._find_similar(q_embed, thresh=0.92)

now = datetime.now().isoformat(timespec='seconds')
if idx is None:

self.df.loc[len(self.df)] = {

"canonical_q_id": _make_hash(pregunta, now),

"q_text_canonica": pregunta,

"q_embed_json": json.dumps(q_embed.tolist()),

"field_hints_json": json.dumps(list(set(field_hints or []))),

"expected_answer": expected_answer,

"dont_say_json": json.dumps([]),

"last_update": now,

"uses_count": 0,

"quality_score_rule": quality,

"schema_version": SCHEMA_VERSION

else:

old_hints = json.loads(self.df.loc[idx, "field_hints_json"] or


"[]")

merged = list(set(old_hints + (field_hints or [])))

self.df.loc[idx, "q_text_canonica"] = pregunta

self.df.loc[idx, "q_embed_json"] =
json.dumps(q_embed.tolist())

self.df.loc[idx, "field_hints_json"] = json.dumps(merged)

if expected_answer:

self.df.loc[idx, "expected_answer"] = expected_answer

old_q = self.df.loc[idx, "quality_score_rule"] or 1.0

self.df.loc[idx, "quality_score_rule"] = (old_q + quality)/2

self.df.loc[idx, "last_update"] = now

_safe_write_parquet(self.df, PQT_RULES)

self._refresh_matrix()

def record_use(self, canonical_q_id: str):


with self._lock:

m = self.df["canonical_q_id"] == canonical_q_id

if m.any():

idx = self.df[m].index[0]

self.df.loc[idx, "uses_count"] = (self.df.loc[idx, "uses_count"]


or 0) + 1

_safe_write_parquet(self.df, PQT_RULES)

# ===================== MEMORY SUPERVISOR


=====================

class MemorySupervisor:

_lock = threading.Lock()

def __init__(self):

self.ev_cols = [

"event_id","timestamp","interaction_id","tipo_evento","subtipo","esta
do",

"pregunta","respuesta","contexto","feedback","campo_solicitado",

"valores_correctos","nombre_humano","who","schema_version"

self.events_df = _read_parquet_or_empty(PQT_EVENTS,
self.ev_cols)

self.ft_cols =
["sample_id","instruction","input","output","source_rule_id","quality",

"template_id","version_schema","fecha_inclusion"]

self.ft_df = _read_parquet_or_empty(PQT_FT, self.ft_cols)

self.erradas_sim =
self.events_df[(self.events_df["tipo_evento"]=="interaction") &
(self.events_df["subtipo"]=="errada")]
["pregunta"].dropna().tolist()

self.memoria_turnos: List[Dict] = []

if self.events_df.empty and (os.path.exists(JSON_VALID) or


os.path.exists(JSON_ERR)):

self._migrate_legacy_json()

def _migrate_legacy_json(self):

def load(path):

if os.path.isfile(path):

with open(path,'r',encoding='utf-8') as f:

return json.load(f)

return []

val, err, corr = load(JSON_VALID), load(JSON_ERR),


load(JSON_CORR)

rows = []

for r in val:

ts = r.get("timestamp",
datetime.now().isoformat(timespec='seconds'))

iid = _make_hash(r.get("pregunta",""), ts)

rows.append({

"event_id": _make_hash(iid,"validada",ts),

"timestamp": ts,

"interaction_id": iid,

"tipo_evento": "interaction",

"subtipo": "validada",

"estado": "validada",

"pregunta": r.get("pregunta",""),
"respuesta": r.get("respuesta",""),

"contexto": r.get("contexto",{}),

"feedback": "s",

"campo_solicitado": None,

"valores_correctos": None,

"nombre_humano": None,

"who": "user",

"schema_version": SCHEMA_VERSION

})

for r in err:

ts = r.get("timestamp",
datetime.now().isoformat(timespec='seconds'))

iid = _make_hash(r.get("pregunta",""), ts)

rows.append({

"event_id": _make_hash(iid,"errada",ts),

"timestamp": ts,

"interaction_id": iid,

"tipo_evento": "interaction",

"subtipo": "errada",

"estado": "n/a",

"pregunta": r.get("pregunta",""),

"respuesta": r.get("respuesta",""),

"contexto": r.get("contexto",{}),

"feedback": "n",

"campo_solicitado": None,

"valores_correctos": None,

"nombre_humano": None,

"who": "user",

"schema_version": SCHEMA_VERSION
})

for r in corr:

ts = r.get("timestamp",
datetime.now().isoformat(timespec='seconds'))

iid = _make_hash(r.get("pregunta",""), ts)

rows.append({

"event_id": _make_hash(iid,"corregida",ts),

"timestamp": ts,

"interaction_id": iid,

"tipo_evento": "correction",

"subtipo": "corregida",

"estado": "validada",

"pregunta": r.get("pregunta",""),

"respuesta": r.get("respuesta_original",""),

"contexto": None,

"feedback": "n",

"campo_solicitado": r.get("campo_solicitado",""),

"valores_correctos": r.get("valores_correctos",[]),

"nombre_humano": r.get("nombre_humano",""),

"who": "user",

"schema_version": SCHEMA_VERSION

})

if rows:

self.events_df = pd.concat([self.events_df,
pd.DataFrame(rows)], ignore_index=True)

_safe_write_parquet(self.events_df, PQT_EVENTS)

self.erradas_sim =
self.events_df[(self.events_df["tipo_evento"]=="interaction") &

(self.events_df["subtipo"]=="errada")]
["pregunta"].dropna().tolist()
def log_event(self, **kwargs):

with self._lock:

if "event_id" not in kwargs or not kwargs["event_id"]:

kwargs["event_id"] =
_make_hash(kwargs.get("interaction_id",""),

kwargs.get("timestamp",""),

kwargs.get("tipo_evento",""))

if (self.events_df["event_id"]==kwargs["event_id"]).any():

return

self.events_df.loc[len(self.events_df)] = {c: kwargs.get(c,


None) for c in self.ev_cols}

_safe_write_parquet(self.events_df, PQT_EVENTS)

def add_turn(self, pregunta, respuesta, contexto):

turno = {

"timestamp": datetime.now().isoformat(timespec='seconds'),

"pregunta": pregunta,

"respuesta": respuesta,

"feedback": "",

"contexto": contexto

iid = _make_hash(pregunta, turno["timestamp"])

self.memoria_turnos.append(turno)

self.log_event(event_id=_make_hash(iid,"raw",turno["timestamp"]),

timestamp=turno["timestamp"],

interaction_id=iid,

tipo_evento="interaction",

subtipo="raw",
estado="n/a",

pregunta=pregunta,

respuesta=respuesta,

contexto=contexto,

feedback="",

campo_solicitado=None,

valores_correctos=None,

nombre_humano=None,

who="agent",

schema_version=SCHEMA_VERSION)

return turno, iid

def mark_valid(self, turno, iid):

self.log_event(event_id=_make_hash(iid,"validada",turno["timestamp"
]),

timestamp=turno["timestamp"],

interaction_id=iid,

tipo_evento="interaction",

subtipo="validada",

estado="validada",

pregunta=turno["pregunta"],

respuesta=turno["respuesta"],

contexto=turno["contexto"],

feedback="s",

campo_solicitado=None,

valores_correctos=None,

nombre_humano=None,

who="user",

schema_version=SCHEMA_VERSION)
def mark_errada(self, turno, iid):

self.log_event(event_id=_make_hash(iid,"errada",turno["timestamp"]),

timestamp=turno["timestamp"],

interaction_id=iid,

tipo_evento="interaction",

subtipo="errada",

estado="n/a",

pregunta=turno["pregunta"],

respuesta=turno["respuesta"],

contexto=turno["contexto"],

feedback="n",

campo_solicitado=None,

valores_correctos=None,

nombre_humano=None,

who="user",

schema_version=SCHEMA_VERSION)

self.erradas_sim.append(turno["pregunta"])

def save_correction(self, turno, iid, campo, valores):

ts = datetime.now().isoformat(timespec='seconds')

self.log_event(event_id=_make_hash(iid,"corregida",ts),

timestamp=ts,

interaction_id=iid,

tipo_evento="correction",

subtipo="corregida",

estado="validada",

pregunta=turno["pregunta"],
respuesta=turno["respuesta"],

contexto=None,

feedback="n",

campo_solicitado=campo,

valores_correctos=valores,

nombre_humano=HUMANIZE.get(campo,campo),

who="user",

schema_version=SCHEMA_VERSION)

def add_ft_sample(self, instruction: str, output: str,

source_rule_id: Optional[str],

input_text: str = "",

quality: float = 1.0,

tipo_muestra: str = "Q/A"):

mask = (self.ft_df["instruction"]==instruction) &


(self.ft_df["output"]==output)

if mask.any():

return

sid = _make_hash(instruction, output, str(datetime.now()))

row = {

"sample_id": sid,

"instruction": instruction,

"input": input_text,

"output": output,

"source_rule_id": source_rule_id,

"quality": quality,

"template_id": TEMPLATE_ID_DV1,

"version_schema": SCHEMA_VERSION,

"fecha_inclusion":
datetime.now().isoformat(timespec='seconds')
}

with self._lock:

self.ft_df.loc[len(self.ft_df)] = row

_safe_write_parquet(self.ft_df, PQT_FT)

def find_similar_errada(self, pregunta):

if not self.erradas_sim:

return False

for prev in self.erradas_sim[-50:]:

if fuzz.token_set_ratio(pregunta, prev) >= 95:

return True

return False

def clear_errada_flag(self, pregunta: str):

self.erradas_sim = [p for p in self.erradas_sim if


fuzz.token_set_ratio(p, pregunta) < 95]

def export_session(self, path):

with open(path, 'w', encoding='utf-8') as f:

json.dump(self.memoria_turnos, f, ensure_ascii=False,
indent=2)

# ===================== CONFIG
=====================

class Config:

def __init__(self, d: Dict[str, Any]):

self.ruta_parquet = d['ruta_parquet']

self.ruta_faiss = d['ruta_faiss']

self.ruta_fragmentos = d['ruta_fragmentos']

self.ruta_modelo = d['ruta_modelo']
self.ruta_prompt_config = d['ruta_prompt_config']

self.embedding_model = d.get('embedding_model', 'all-MiniLM-


L6-v2')

self.fuzzy_threshold = d.get('fuzzy_threshold', 80)

self.fuzzy_top = d.get('fuzzy_top', 5)

self.nlg_max_items = d.get('nlg_max_items', 8)

self.n_ctx = d.get('n_ctx', 4096)

self.n_threads = d.get('n_threads', 4)

self.use_spellcheck = d.get('use_spellcheck', True)

self.nlg_use_llm = d.get('nlg_use_llm', True)

self.semantic_fallback = d.get('semantic_fallback', True)

self.debug = d.get('debug', False)

self.ruta_diccionario_symspell =
d.get('ruta_diccionario_symspell')

self.block_erradas = d.get('block_erradas', False)

# ===================== AGENTE INTELIGENTE


=====================

class PharmaSearchAgent:

def __init__(self, config: Dict[str, Any]):

self.config = Config(config)

self._setup_logging()

self._load_prompt_config()

self._load_resources()

self._build_indices()

self._setup_templates()

self.memoria = MemorySupervisor()

self.rules = RuleMemory(self.embedder)

self.typos = TypoMemory()

self.last_query = ""
self.logger.info("✅ Sistema inicializado correctamente")

def _setup_logging(self):

import sys, io

logger = logging.getLogger("PharmaSearchAgent")

logger.setLevel(logging.DEBUG if self.config.debug else


logging.INFO)

try:

sys.stdout.reconfigure(encoding='utf-8', errors='replace')

sys.stderr.reconfigure(encoding='utf-8', errors='replace')

stream = sys.stdout

except Exception:

stream = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-


8', errors='replace')

handler = logging.StreamHandler(stream)

handler.setLevel(logging.DEBUG if self.config.debug else


logging.INFO)

fmt = logging.Formatter("%(asctime)s [%(levelname)s] %


(name)s:%(lineno)d - %(message)s", "%Y-%m-%d %H:%M:%S")

handler.setFormatter(fmt)

logger.handlers.clear()

logger.addHandler(handler)

logger.propagate = False

self.logger = logger

def _load_prompt_config(self):

with open(self.config.ruta_prompt_config, 'r', encoding='utf-8') as


f:

cfg = json.load(f)

self.aliases = cfg.get('aliases', {})

self.intents = cfg.get('intents', {
"codes": ["codigo","código","codigos","sap","sku","id"],

"forms": ["forma","farmaceutica","farmacéutica","formas"],

"presentations":["presentacion","presentaciones"],

"names":["nombre","descripcion","descripción"],

"all":["todos","todas","lista","listar"]

})

self.priority_cols = cfg.get('priority_cols', [

"presentacionesdeproducto","molecula","marca","lineacomercial",

"grupodetrabajo","formafarmaceutica","gerente","tipodemolecula","ti
podeproducto"

])

self.stopwords = set(cfg.get('stopwords', [

"que","cual","cuál","tengo","es","el","la","los","las","de","del","para","
en","con",

"maneja","ve","contiene","dime","dame","busca","buscar","listar","list
a","todos","todas"

]))

def _load_resources(self):

tasks = {

'data': self._load_data,

'emb': self._load_embeddings,

'faiss': self._load_faiss,

'frags': self._load_fragments,

'llm': self._load_llm,

'spell': self._setup_spellcheck

}
with ThreadPoolExecutor() as ex:

futs = {ex.submit(func): n for n, func in tasks.items()}

for fut in futs:

name = futs[fut]

try:

fut.result()

self.logger.info("%s cargado", name.upper())

except Exception as e:

self.logger.error("Error %s: %s", name, e)

if name in ("data","llm"):

raise

def _load_data(self):

self.df = pd.read_parquet(self.config.ruta_parquet)

self.df.columns = self.df.columns.str.lower()

for col in ['molecula','presentacionesdeproducto','marca']:

if col in self.df.columns:

self.df[col+"_norm"] =
self.df[col].astype(str).map(_norm_txt)

self.domain_vocab = set()

for col in ['molecula','presentacionesdeproducto','marca']:

if col in self.df.columns:

self.domain_vocab.update(self.df[col].astype(str).str.lower().unique())

def _load_embeddings(self):

self.embedder =
SentenceTransformer(self.config.embedding_model, device='cpu')

def _load_faiss(self):
try:

self.faiss_index = faiss.read_index(self.config.ruta_faiss)

except Exception as e:

self.logger.error(f"FAISS no disponible: {e}")

self.faiss_index = None

def _load_fragments(self):

with open(self.config.ruta_fragmentos,'r',encoding='utf-8') as f:

self.text_fragments = json.load(f)

def _load_llm(self):

self.llm = Llama(model_path=self.config.ruta_modelo,

n_ctx=self.config.n_ctx,

n_threads=self.config.n_threads,

verbose=False)

def _setup_spellcheck(self):

if not self.config.use_spellcheck:

self.spellchecker = None

return

self.spellchecker = SymSpell(max_dictionary_edit_distance=2,
prefix_length=7)

if self.config.ruta_diccionario_symspell:

self.spellchecker.load_dictionary(self.config.ruta_diccionario_symspell,
0, 1, encoding='utf-8')

def _build_indices(self):

self.inverted_indices = {}

for col in ['molecula','presentacionesdeproducto','marca']:


ncol = col+"_norm"

if ncol in self.df.columns:

tok_series = self.df[ncol].str.split()

mapping = {}

for idx, toks in tok_series.items():

if isinstance(toks, list):

for t in toks:

mapping.setdefault(t, set()).add(idx)

self.inverted_indices[col] = mapping

# --- NUEVA IMPLEMENTACIÓN INTELIGENTE ---

def generate_response(self, query: str) -> Tuple[str,


Optional[Dict], pd.DataFrame, str]:

"""

Versión mejorada con análisis semántico profundo y respuesta


contextual.

Devuelve: (respuesta, regla_utilizada, dataframe_resultados,


modo_busqueda)

"""

# 1. Preprocesamiento inteligente

query_corregida, correcciones, ambiguos =


self.fix_tokens(query)

# 2. Análisis semántico avanzado

analisis = self._analizar_consulta_avanzada(query_corregida)

# 3. Búsqueda relacional adaptativa

resultados, modo = self._busqueda_adaptativa(analisis)

# 4. Generación de respuesta contextual


respuesta = self._generar_respuesta_adaptativa(analisis,
resultados, query_corregida)

# 5. Registro para aprendizaje

self._registrar_interaccion(

pregunta_original=query,

pregunta_procesada=query_corregida,

analisis=analisis,

resultados=resultados,

respuesta=respuesta

return respuesta, None, resultados, modo

def _analizar_consulta_avanzada(self, query: str) -> Dict:

"""Analiza la consulta usando embeddings y relaciones


aprendidas"""

# Embedding de la pregunta

embedding = self.embedder.encode([query])[0]

return {

'embedding': embedding,

'entidades': self._extraer_entidades(query),

'atributos': self._extraer_atributos(query),

'tipo_pregunta': self._clasificar_tipo_pregunta(embedding),

'preguntas_similares':
self._buscar_preguntas_similares(embedding)

}
def _busqueda_adaptativa(self, analisis: Dict) ->
Tuple[pd.DataFrame, str]:

"""Ejecuta la búsqueda más adecuada según el análisis"""

# Búsqueda exacta primero si hay entidad específica

if analisis['entidades']:

resultado = self._buscar_por_entidad(analisis['entidades']
[0])

if not resultado.empty:

return resultado, "exacta"

# Búsqueda semántica como fallback

return self._busqueda_semantica(analisis['embedding']),
"semantica"

def _generar_respuesta_adaptativa(self, analisis: Dict,


resultados: pd.DataFrame, pregunta: str) -> str:

"""Genera respuestas naturales para cualquier tipo de


consulta"""

plantillas = {

'entidad_atributo': (

"El {entidad} '{valor_entidad}' tiene los siguientes


{atributo}: {valores}\n"

"¿Necesitas más detalles sobre algún producto en


particular?"

),

'comparacion': (

"He encontrado estos productos relacionados:\


n{productos}\n"

"¿Qué información específica necesitas?"

),

'general': (
"Esto es lo que encontré sobre tu consulta:\
n{resultados}\n"

"¿Quieres refinar la búsqueda?"

# Seleccionar plantilla según tipo de pregunta

tipo = analisis['tipo_pregunta']

if tipo in plantillas:

return plantillas[tipo].format(

entidad=analisis['entidades'][0] if analisis['entidades']
else 'producto',

atributo=analisis['atributos'][0] if analisis['atributos'] else


'datos',

valor_entidad=resultados.iloc[0][analisis['entidades'][0]]
if analisis['entidades'] else '',

valores=", ".join(resultados[analisis['atributos']
[0]].unique()) if analisis['atributos'] else '',

productos="\n-
".join(resultados['presentacionesdeproducto'].unique()),

resultados=resultados.to_string(index=False)

return "Aquí tienes la información solicitada:\n" +


resultados.to_string(index=False)

def _exact_contains(self, fields: List[str], value: str) ->


pd.DataFrame:

"""

Búsqueda exacta en los campos especificados.


Args:

fields: Lista de campos donde buscar

value: Valor a buscar (texto normalizado)

Returns:

DataFrame con los resultados que coinciden exactamente

"""

if not value or not fields:

return pd.DataFrame()

value_norm = _norm_txt(value)

cond = pd.Series(False, index=self.df.index)

for field in fields:

if field in self.df.columns:

# Búsqueda exacta (ignore case)

if field in CODE_COLS:

# Para códigos, búsqueda exacta sin normalizar

cond |= (self.df[field].astype(str) == value)

else:

# Para otros campos, búsqueda normalizada

if f"{field}_norm" in self.df.columns:

cond |= (self.df[f"{field}_norm"] == value_norm)

else:

cond |= (self.df[field].astype(str).str.lower() ==
value_norm)

return self.df[cond].copy()
def _search_code(self, code: str) -> pd.DataFrame:

"""

Búsqueda exacta por código SAP.

Args:

code: Código a buscar (como string)

Returns:

DataFrame con el producto que coincide con el código

"""

if not code or 'codigosap' not in self.df.columns:

return pd.DataFrame()

return self.df[self.df['codigosap'].astype(str) == str(code)].copy()

def _token_or(self, field: str, value: str) -> pd.DataFrame:

"""

Búsqueda por tokens (cualquier palabra coincide).

Args:

field: Campo donde buscar

value: Valor con múltiples tokens a buscar

Returns:

DataFrame con resultados donde al menos un token coincide

"""

if not value or field not in self.df.columns:

return pd.DataFrame()
tokens = [t for t in _norm_txt(value).split() if len(t) > 3]

if not tokens:

return pd.DataFrame()

col_norm = f"{field}_norm" if f"{field}_norm" in self.df.columns


else field

series = self.df[col_norm].astype(str).map(_norm_txt)

cond = pd.Series(False, index=self.df.index)

for t in tokens:

cond |= series.str.contains(t, regex=False)

return self.df[cond].copy()

def _descubrir_relacion(self, target: str, atributo: str) -> str:

"""Descubre automáticamente cómo se relacionan los


campos."""

# Análisis estadístico de co-ocurrencias

freq = self.df.groupby([target,
atributo]).size().reset_index(name='count')

if not freq.empty:

return f"Los valores de {atributo} asociados a {target} son:


{', '.join(freq[atributo].unique())}"

return f"No encontré relación directa entre {target} y


{atributo}"

def _generar_respuesta_inteligente(self, df: pd.DataFrame,


pregunta: str) -> str:

"""Genera respuestas dinámicas para cualquier combinación."""

# Análisis lingüístico profundo

tokens = self._analizar_pregunta(pregunta)
# Extracción de entidades y atributos

entidades = self._identificar_entidades(tokens)

atributos = self._identificar_atributos(tokens)

# Construcción de respuesta adaptativa

if len(entidades) == 1 and len(atributos) == 1:

target = entidades[0]

attr = atributos[0]

if not df.empty:

valores = df[attr].dropna().unique()

return (f"Para {target}, los valores de {attr} son: " +

", ".join(valores[:5]) +

("..." if len(valores) > 5 else ""))

return self._descubrir_relacion(target, attr)

# Respuesta para múltiples relaciones

return self._explicar_relaciones_complejas(df, entidades,


atributos)

def _analizar_pregunta(self, pregunta: str) -> Dict:

"""Análisis profundo con NLP (sin dependencia de keywords)"""

# Embeddings + clustering semántico

embedding = self.embedder.encode([pregunta])[0]

similares = self._buscar_preguntas_similares(embedding)

return {

'embedding': embedding,

'campos_relevantes': self._inferir_campos(embedding),
'tipo_respuesta': self._predecir_tipo_respuesta(embedding),

'preguntas_similares': similares

def _buscar_preguntas_similares(self, embedding: np.array) -> List:

"""Busca preguntas similares en la base de conocimiento"""

# Implementación con FAISS

pass

def _inferir_campos(self, embedding: np.array) -> List[str]:

"""Predice campos relevantes usando el embedding"""

# Modelo de clasificación simple

campos_posibles = list(HUMANIZE.keys())

scores = [np.dot(embedding, self.field_embeddings[c]) for c in


campos_posibles]

return [campos_posibles[i] for i in np.argsort(scores)[-3:]]

# Nuevo sistema de aprendizaje

def _aprender_de_interaccion(self, pregunta: str, respuesta: str,


feedback: str):

"""Aprendizaje reforzado profundo"""

# Registro de la interacción

self.memoria.registrar_interaccion(

pregunta=pregunta,

respuesta=respuesta,

feedback=feedback,

embedding=self.embedder.encode([pregunta])[0]

# Ajuste dinámico de pesos


if feedback == 'positivo':

self._reforzar_patron(pregunta, respuesta)

else:

self._ajustar_modelo(pregunta, respuesta)

def _reforzar_patron(self, pregunta: str, respuesta: str):

"""Refuerza los patrones exitosos"""

# Actualización de embeddings de campos

pass

def _setup_templates(self):

self.templates = {

'single': (

"Te cuento sobre el producto {nom}:\n"

"- Contiene la molécula {mol}\n"

"- Presentación: {ff} {conc}\n"

"- Línea comercial: {linea}\n"

"- Gerente: {gerente}\n\n"

"¿Necesitas más información sobre este producto?"

),

'multi': (

"Encontré estos productos relacionados con tu consulta:\n\


n"

"{rows}\n\n"

"{extra}"

),

'nf': (

"No encontré resultados exactos para \"{q}\".\n"

"Puedes intentar con:\n"


"- Nombre comercial completo\n"

"- Molécula principal\n"

"- Código SAP\n\n"

"¿Quieres reformular la búsqueda?"

),

'field_only': (

"En relación a tu consulta sobre {campo}, estos son los


valores encontrados:\n"

"{valores}\n\n"

"{extra}"

),

'typo_ask': (

"¿Detecté errores de escritura?:\n{lista}\n"

"Puedes corregir la frase completa o presionar Enter para


dejarla igual:"

def _parse(self, q: str) -> Dict[str, Any]:

"""Analiza la consulta para extraer intención, campos relevantes


y valores."""

result = {

"is_code": False,

"code": "",

"intent": "general",

"fields": [],

"value": "",

"dont_say": []

}
# Normalizar consulta

qn = _norm_txt(q)

# Detección de código SAP (6+ dígitos)

code_match = re.search(r"\b\d{6,}\b", q)

if code_match:

result["is_code"] = True

result["code"] = code_match.group()

result["intent"] = "code"

result["fields"] = ["codigosap"]

result["value"] = q

return result

# Detección de intención específica

intent = self._detect_intent(qn)

result["intent"] = intent["type"]

# Campos basados en la intención

if intent["fields"]:

result["fields"] = intent["fields"]

# Extraer términos relevantes (eliminar stopwords)

tokens = [t for t in qn.split()

if t not in self.stopwords

and not re.match(r"\d{1,5}$", t)] # Ignorar números cortos

# Valor de búsqueda (sin campos específicos)

result["value"] = " ".join(tokens)


return result

def _detect_intent(self, q: str) -> Dict[str, Any]:

"""Detecta la intención de la consulta con nivel de confianza."""

ql = q.lower()

intent = {

"type": "general",

"fields": [],

"priority": 1,

"certainty": 0.8

# 1. Detección por campo solicitado explícitamente

campo_pregunta = self._detectar_campo_pregunta(q)

if campo_pregunta:

intent.update({

"type": "field_query",

"fields": [campo_pregunta],

"priority": 2,

"certainty": 0.95

})

return intent

# 2. Detección por palabras clave

kw_mapping = {

'code': (self.intents.get('codes', []), ['codigosap']),

'forms': (self.intents.get('forms', []), ['formafarmaceutica']),

'present': (self.intents.get('presentations', []),


['presentacionesdeproducto']),
'names': (self.intents.get('names', []),
['presentacionesproductosap']),

'all': (self.intents.get('all', []),


['molecula','presentacionesdeproducto','marca'])

for intent_type, (words, fields) in kw_mapping.items():

if any(w in ql for w in words):

intent.update({

"type": intent_type,

"fields": fields,

"priority": 2,

"certainty": 0.9

})

break

# 3. Detección semántica (campos mencionados)

if intent["type"] == "general":

tokens = ql.split()

for col in self.df.columns:

if col in tokens and col not in intent["fields"]:

intent["fields"].append(col)

intent["certainty"] = max(intent["certainty"], 0.85)

return intent

def _retrieve_rule(self, query: str) -> Optional[Dict]:

"""Recupera la regla más similar a la consulta desde la memoria


de reglas."""

try:
# Buscar regla similar con un umbral de similitud alta

rule = self.rules.get_rule(query, thresh=0.88)

if rule:

self.logger.debug(f"Regla recuperada:
{rule['q_text_canonica']}")

# Registrar uso para priorizar reglas frecuentes

self.rules.record_use(rule['canonical_q_id'])

# Filtrar campos no existentes en los datos actuales

valid_fields = [f for f in rule['field_hints'] if f in


self.df.columns]

if valid_fields:

rule['field_hints'] = valid_fields

else:

rule['field_hints'] = self.priority_cols

return rule

return None

except Exception as e:

self.logger.error(f"Error al recuperar regla: {e}")

return None

def _apply_rule_to_parsed(self, parsed: Dict[str, Any], rule: Dict[str,


Any]) -> Dict[str, Any]:

"""Aplica los hints de la regla al parseo de la consulta."""

if not rule or not parsed:

return parsed
# Priorizar campos sugeridos por la regla

hints = rule.get('field_hints', [])

if hints:

parsed['fields'] = hints + [f for f in parsed.get('fields', []) if f


not in hints]

# Aplicar restricciones de la regla

if 'dont_say' in rule:

parsed['dont_say'] = rule['dont_say']

return parsed

def fix_tokens(self, text: str) -> Tuple[str, Dict[str, str], Dict[str,


List[str]]]:

"""Corrección ortográfica avanzada con soporte para términos


del dominio."""

if not self.config.use_spellcheck or not text:

return text, {}, {}

tokens = text.split()

corrected: Dict[str, str] = {} # {original: corrección}

ambiguous: Dict[str, List[str]] = {} # {original: [sugerencias]}

new_toks: List[str] = []

for t in tokens:

tn = _norm_txt(t)

# No corregir si es acrónimo, número largo o ya está en el


vocabulario

if (ACRONYM_RE.search(t) or (LONG_NUM_RE.search(t)) or (tn


in self.domain_vocab)):
new_toks.append(t)

continue

# Primero verificar correcciones aprendidas

learned_corr = self.typos.get(tn)

if learned_corr:

corrected[t] = learned_corr

new_toks.append(learned_corr)

continue

# Generar candidatos de corrección

candidates: Dict[str, int] = {}

# Correcciones del diccionario SymSpell

if self.spellchecker:

suggestions = self.spellchecker.lookup(tn, verbosity=2)

for sug in suggestions:

score = max(0, 100 - sug.distance * 10)

candidates[sug.term] = max(candidates.get(sug.term,
0), score)

# Correcciones basadas en el vocabulario del dominio

domain_matches = process.extract(tn, list(self.domain_vocab),


limit=3, scorer=fuzz.token_set_ratio)

for term, score, _ in domain_matches:

if score >= 70: # Umbral mínimo para considerar

candidates[term] = max(candidates.get(term, 0), score)

if not candidates:

new_toks.append(t)
continue

# Seleccionar la mejor corrección

best_candidate = max(candidates.items(), key=lambda x:


x[1])

best_term, best_score = best_candidate

# Aplicar corrección según confianza

if best_score >= 90: # Alta confianza - corregir


automáticamente

corrected[t] = best_term

new_toks.append(best_term)

self.typos.upsert(tn, best_term)

elif best_score >= 80: # Confianza media - preguntar al


usuario

top_candidates = [k for k, v in sorted(candidates.items(),

key=lambda x: x[1],

reverse=True)[:3]]

ambiguous[t] = top_candidates

new_toks.append(t)

else: # Baja confianza - dejar como está

new_toks.append(t)

return " ".join(new_toks), corrected, ambiguous

def _ask_user_for_typo_correction(self, original: str, suggestions:


List[str]) -> str:

"""Interfaz para que el usuario seleccione corrección."""

print(f"\n¿Quieres corregir '{original}'? Opciones:")

for i, sug in enumerate(suggestions, 1):


print(f"{i}. {sug}")

print("0. Dejar como está")

while True:

choice = input("Selecciona una opción (número): ")

if choice.isdigit():

choice_idx = int(choice)

if 0 <= choice_idx <= len(suggestions):

return suggestions[choice_idx-1] if choice_idx > 0 else


original

print("Opción inválida. Intenta nuevamente.")

# ========== MÉTODOS DE BÚSQUEDA MEJORADOS


==========

def _exact_search(self, parsed: dict) -> pd.DataFrame:

"""Búsqueda exacta con priorización de campos"""

campos = parsed.get('fields', [])

value = parsed.get('value', '')

if parsed.get('is_code'):

return self._search_code(parsed['code'])

# Si no hay campos específicos, usar los prioritarios

if not campos:

campo_pregunta =
self._detectar_campo_pregunta(parsed['value'])

if campo_pregunta:

campos = [campo_pregunta]

else:

campos = self.priority_cols
# Intentar diferentes estrategias de búsqueda exacta

for campo in campos:

res = self._exact_contains([campo], value)

if not res.empty:

return res

res = self._token_or(campo, value)

if not res.empty:

return res

return pd.DataFrame()

def _semantic_fallback(self, query: str) -> pd.DataFrame:

"""Búsqueda semántica usando FAISS"""

if not hasattr(self, "faiss_index") or self.faiss_index is None:

return pd.DataFrame()

embed = self.embedder.encode([query]).astype(np.float32)

D, I = self.faiss_index.search(embed, k=10)

idxs = [i for i in I[0] if i >= 0]

return self.df.iloc[idxs] if idxs else pd.DataFrame()

def _fuzzy_search(self, parsed: dict) -> pd.DataFrame:

"""Búsqueda aproximada con manejo inteligente de campos"""

campos = parsed.get('fields', self.priority_cols)

value = parsed.get('value', '')

# Primero intentar con campos prioritarios


res = self._fuzzy(campos, value)

if not res.empty:

return res

# Si no funciona, intentar con todos los campos

return self._fuzzy(self.df.columns.tolist(), value)

def _wide_search(self, query: str) -> pd.DataFrame:

"""Búsqueda amplia en múltiples campos"""

v = _norm_txt(query)

cols =
["presentacionesdeproducto","presentacionesproductosap","molecula
","marca","lineacomercial","grupodetrabajo"]

cond = pd.Series(False, index=self.df.index)

for c in cols:

if c in self.df.columns:

cond |= self.df[c].astype(str).map(_norm_txt).str.contains(v,
regex=False)

return self.df[cond]

def search_products(self, query: str, rule: Optional[Dict],


reintento=0) -> Tuple[pd.DataFrame, str]:

"""Búsqueda mejorada que combina múltiples estrategias"""

parsed = self._parse(query)

if rule:

parsed = self._apply_rule_to_parsed(parsed, rule)

# NO bloquear: solo aviso

if not rule and self.memoria.find_similar_errada(query):

self.logger.warning("⚠️Similar a errada previa, pero intentaré


igual con fallback controlado.")
# Estrategia 1: Búsqueda exacta en campos prioritarios

df_res = self._exact_search(parsed)

if not df_res.empty:

return df_res, "exacta"

# Estrategia 2: Búsqueda semántica con FAISS

if self.config.semantic_fallback:

df_res = self._semantic_fallback(query)

if not df_res.empty:

return df_res, "semántica"

# Estrategia 3: Búsqueda aproximada

df_res = self._fuzzy_search(parsed)

if not df_res.empty:

return df_res, "aproximada"

# Estrategia 4: Búsqueda amplia

df_res = self._wide_search(query)

if not df_res.empty:

return df_res.head(50), "amplia"

return pd.DataFrame(), "no encontrado"

# ========== DETECCIÓN DE CAMPO SOLICITADO


==========

def _detectar_campo_pregunta(self, pregunta: str) -> Optional[str]:

q = _norm_txt(pregunta)

# Directo por clave/alias


for campo, nombre in HUMANIZE.items():

if campo in q or _norm_txt(nombre) in q:

return campo

# También busca en posibles alias si tienes definidos en el JSON


de config

if hasattr(self, "aliases"):

for alias, campo in self.aliases.items():

if alias in q:

return campo

return None

# ========== GENERACIÓN DE RESPUESTAS MEJORADA


==========

def generate_response(self, query: str) -> Tuple[str, Optional[Dict],


pd.DataFrame, str]:

t0 = time.time()

fixed_q, auto_corr, ambig = self.fix_tokens(query)

if auto_corr:

self.logger.debug(f"Correcciones auto: {auto_corr}")

if ambig:

lista = "\n".join([f"- '{o}': {', '.join(c)}" for o,c in


ambig.items()])

user_fix = input(self.templates['typo_ask'].format(lista=lista)
+ "\n> ").strip()

if user_fix:

fixed_q = user_fix

for orig, cands in ambig.items():

for c in cands:

if c in _norm_txt(user_fix).split():

self.typos.upsert(_norm_txt(orig), c)

break
# Detectar si la pregunta es sobre un campo específico

campo_pregunta = self._detectar_campo_pregunta(fixed_q)

rule = self._retrieve_rule(fixed_q)

df_res, modo = self.search_products(fixed_q, rule, reintento=0)

# Si la pregunta es sobre un campo específico, responder solo


ese campo

if campo_pregunta and campo_pregunta in self.df.columns:

self.logger.info(f"[INTELIGENTE] Respondo solo el campo


solicitado: {HUMANIZE.get(campo_pregunta,campo_pregunta)}")

resp = self._gen_nlg_field(df_res, campo_pregunta, fixed_q)

self.logger.debug(f"Tiempo total respuesta: {time.time()-


t0:.2f}s")

return resp, rule, df_res, "campo"

# Respuesta normal si no se detectó campo explícito

if df_res.empty:

resp = "No encontré coincidencias exactas. ¿Podrías revisar la


escritura o proporcionar más detalles?"

self.logger.debug(f"Tiempo total respuesta: {time.time()-


t0:.2f}s")

return resp, rule, df_res, modo

# Generar respuesta basada en el modo de búsqueda

style_hint = rule["expected_answer"] if rule else ""

use_llm = self.config.nlg_use_llm

if modo == "exacta":

resp = self._gen_nlg(df_res, fixed_q, modo="exacta",


style_hint=style_hint, use_llm=use_llm)
elif modo in ("semántica", "aproximada"):

resp = PROMPT_RETRY.format(respuesta=self._gen_nlg(df_res,
fixed_q, modo=modo, style_hint=style_hint, use_llm=use_llm))

elif modo == "no encontrado":

resp = PROMPT_SUGERENCIA

else:

resp = "No encontré información relevante. ¿Puedes


reformular tu pregunta?"

resp = self._postprocess_response(resp)

self.logger.debug(f"Tiempo total respuesta: {time.time()-


t0:.2f}s")

return resp, rule, df_res, modo

def _gen_nlg(self, df_res: pd.DataFrame, query: str,


modo="normal",

style_hint: str = "", use_llm=True) -> str:

if df_res.empty:

return self._response_not_found(query)

# Preprocesar datos para el LLM

processed_data = []

for _, row in df_res.iterrows():

item = {}

for col, val in row.items():

if pd.notna(val):

human_col = HUMANIZE.get(col, col)

item[human_col] = str(val)

processed_data.append(item)
if use_llm:

try:

prompt = PROMPT_NLG.format(

pregunta=query,

resultados_json=json.dumps(processed_data,
ensure_ascii=False, indent=2)

out = self.llm(prompt, max_tokens=800, temperature=0.7)

resp = out['choices'][0]['text'].strip()

if resp:

return resp

except Exception as e:

self.logger.error(f"Error en LLM: {e}")

# Fallback a respuestas predefinidas

if len(df_res) == 1:

row = df_res.iloc[0]

data = {

'nom': row.get('presentacionesdeproducto', 'N/A'),

'mol': row.get('molecula', 'N/A'),

'ff': row.get('formafarmaceutica', 'N/A'),

'conc': row.get('concentracion', 'N/A'),

'linea': row.get('lineacomercial', 'N/A'),

'gerente': row.get('gerente', 'N/A')

return self.templates['single'].format(**data)

else:

return self._response_multi(df_res, query)


def _gen_nlg_field(self, df_res: pd.DataFrame, campo: str, pregunta:
str, style_hint: str = "") -> str:

human = HUMANIZE.get(campo, campo)

if df_res.empty or campo not in df_res.columns:

return f"No tengo información sobre {human.lower()} para


esta consulta."

vals = df_res[campo].dropna().astype(str).unique().tolist()

if len(vals) == 1:

return f"El valor de {human} para esta consulta es:


{vals[0]}."

elif len(vals) <= 5:

lista = ", ".join(vals[:-1]) + f" y {vals[-1]}" if len(vals) > 1 else


vals[0]

return f"Los valores de {human} encontrados son: {lista}."

else:

primeras = vals[:5]

lista = ", ".join(primeras[:-1]) + f" y {primeras[-1]}" if


len(primeras) > 1 else primeras[0]

extra = "… Hay más valores disponibles. Pídeme \"todos\" si


quieres la lista completa."

return self.templates['field_only'].format(

campo=human.lower(),

valores=lista,

extra=extra

# ========== MÉTODOS AUXILIARES ==========

def _response_multi(self, df: pd.DataFrame, query: str) -> str:

limit = 50 if re.search(r"\btodo[s]?\b|\btodas\b|\blista\b",
_norm_txt(query)) else self.config.nlg_max_items
df = df.drop_duplicates('codigosap',
ignore_index=True).head(limit)

lines = [

f"• {r.get('presentacionesdeproducto','(sin nombre)')}:


molécula {r.get('molecula','N/A')}, "

f"marca {r.get('marca','N/A')}, gerente


{r.get('gerente','N/A')}"

for _, r in df.iterrows()

extra = "" if len(df) < limit else "\n… Hay más resultados.
Pide \"todos\" para ver la lista completa."

return "\n".join(lines) + extra

def _response_not_found(self, q: str) -> str:

sugs = self._suggest(q)

sug_text = "\n".join(f"- {s}" for s in sugs) if sugs else "- No hay


sugerencias disponibles."

return self.templates['nf'].format(q=q, sug=sug_text)

def _suggest(self, q: str) -> List[str]:

sugs = []

if 'presentacionesdeproducto' in self.df.columns:

sugs += [m[0] for m in process.extract(q,


self.df['presentacionesdeproducto'].dropna().unique(), limit=3,
scorer=fuzz.token_set_ratio) if m[1] >= 60]

if not sugs and 'molecula' in self.df.columns:

sugs += [m[0] for m in process.extract(q,


self.df['molecula'].dropna().unique(), limit=3,
scorer=fuzz.token_set_ratio) if m[1] >= 60]

return sugs[:3]

def _postprocess_response(self, text: str) -> str:


if not text:

return text

# Eliminar duplicados

lines = text.splitlines()

unique_lines = []

seen = set()

for line in lines:

key = line.strip().lower()

if key not in seen:

unique_lines.append(line)

seen.add(key)

# Unir y limpiar espacios

cleaned = "\n".join(unique_lines)

cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip()

return cleaned

# ========== INTERFAZ DE USUARIO ==========

def _show_available_fields(self):

print("\nCampos disponibles para búsqueda:")

for i, (key, val) in enumerate(HUMANIZE.items(), 1):

print(f"{i}. {val}")

def _ask_for_correct_field(self):

while True:

campo = input("\n¿Qué campo esperabas? (número o


nombre): ").strip()

if campo.isdigit():

num = int(campo)

if 1 <= num <= len(HUMANIZE):


return list(HUMANIZE.keys())[num-1]

campo_lower = campo.lower()

for key, val in HUMANIZE.items():

if campo_lower == val.lower() or campo_lower ==


key.lower():

return key

print("Campo no válido. Intenta nuevamente.")

def process_feedback(self, turno: Dict, iid: str, df_res:


pd.DataFrame, feedback: str):

"""Proceso mejorado para manejar feedback"""

if feedback == "s":

self._handle_positive_feedback(turno, iid)

elif feedback == "m":

self._handle_improvement_feedback(turno, iid)

elif feedback == "n":

self._handle_negative_feedback(turno, iid, df_res)

def _handle_positive_feedback(self, turno: Dict, iid: str):

"""Manejo de feedback positivo"""

self.memoria.mark_valid(turno, iid)

self.memoria.clear_errada_flag(turno["pregunta"])

self.rules.upsert_rule(turno["pregunta"], [], turno["respuesta"],


quality=1.0)

self.memoria.add_ft_sample(turno["pregunta"],
turno["respuesta"], None)

mej = input("✏️¿Quieres mejorar cómo está redactada la


respuesta? [S/N]: ").strip().lower()

if mej in ("s","si"):
esperado = input("Escribe la respuesta esperada tal como la
quieres ver:\n> ").strip()

self.rules.upsert_rule(turno["pregunta"], [], esperado,


quality=1.0)

self.memoria.add_ft_sample(turno["pregunta"], esperado,
None, tipo_muestra="Q/A_expected")

print("👌 Guardé tu versión esperada.")

def _handle_improvement_feedback(self, turno: Dict, iid: str):

"""Manejo de solicitud de mejora"""

self.memoria.mark_valid(turno, iid)

self.memoria.clear_errada_flag(turno["pregunta"])

esperado = input("Escribe la respuesta esperada tal como la


quieres ver:\n> ").strip()

self.rules.upsert_rule(turno["pregunta"], [], esperado,


quality=1.0)

self.memoria.add_ft_sample(turno["pregunta"], esperado, None,


tipo_muestra="Q/A_expected")

print("👌 Guardé tu versión mejorada.")

def _handle_negative_feedback(self, turno: Dict, iid: str, df_res:


pd.DataFrame):

"""Manejo mejorado de feedback negativo"""

print("\nEntiendo que la respuesta no fue adecuada. Vamos a


mejorarla paso a paso:")

# Paso 1: Identificar qué parte falló

print("\n1. ¿Qué aspecto de la respuesta no fue correcto?")

print(" a) La información mostrada es incorrecta")

print(" b) Faltó información importante")

print(" c) El formato no fue adecuado")

print(" d) No entendió bien mi pregunta")


issue = input("Selecciona una opción (a-d): ").lower()

# Paso 2: Soluciones específicas para cada problema

if issue == "a":

self._correct_wrong_info(turno, iid, df_res)

elif issue == "b":

self._add_missing_info(turno, iid, df_res)

elif issue == "c":

self._improve_format(turno, iid)

elif issue == "d":

self._clarify_question(turno, iid)

def _correct_wrong_info(self, turno: Dict, iid: str, df_res:


pd.DataFrame):

"""Corrección de información incorrecta"""

self._show_available_fields()

campo = self._ask_for_correct_field()

if campo:

# Generar nueva respuesta enfocada en el campo correcto

nueva_resp = self._gen_nlg_field(df_res, campo,


turno['pregunta'])

print("\n🤖 (Corrección aplicada):")

print(nueva_resp, "\n")

# Guardar corrección

valores = df_res[campo].dropna().unique().tolist() if campo in


df_res.columns else []

self.memoria.save_correction(turno, iid, campo, valores)


# Aprender de la corrección

self.rules.upsert_rule(

turno["pregunta"],

[campo],

nueva_resp,

quality=1.0

self.memoria.add_ft_sample(

turno["pregunta"],

nueva_resp,

None,

tipo_muestra="Q/A_corrected"

# Confirmar si la nueva respuesta es adecuada

confirm = input("¿Esta nueva respuesta es correcta? [S/N]:


").lower()

if confirm == "s":

self.memoria.mark_valid(turno, iid)

self.memoria.clear_errada_flag(turno["pregunta"])

def _add_missing_info(self, turno: Dict, iid: str, df_res:


pd.DataFrame):

"""Agregar información faltante"""

print("\n¿Qué información faltó en la respuesta?")

self._show_available_fields()

campo = self._ask_for_correct_field()

if campo:
# Generar respuesta con la información adicional

nueva_resp = turno["respuesta"] + "\n\n" +


self._gen_nlg_field(df_res, campo, turno['pregunta'])

print("\n🤖 (Respuesta ampliada):")

print(nueva_resp, "\n")

# Guardar corrección

self.rules.upsert_rule(

turno["pregunta"],

[campo],

nueva_resp,

quality=1.0

self.memoria.add_ft_sample(

turno["pregunta"],

nueva_resp,

None,

tipo_muestra="Q/A_augmented"

confirm = input("¿Esta respuesta ampliada es correcta? [S/N]:


").lower()

if confirm == "s":

self.memoria.mark_valid(turno, iid)

self.memoria.clear_errada_flag(turno["pregunta"])

def _improve_format(self, turno: Dict, iid: str):

"""Mejorar el formato de la respuesta"""

print("\nLa respuesta anterior fue:")

print(turno["respuesta"])
esperado = input("\nEscribe cómo prefieres que se formatee la
respuesta:\n> ").strip()

if esperado:

self.rules.upsert_rule(turno["pregunta"], [], esperado,


quality=1.0)

self.memoria.add_ft_sample(turno["pregunta"], esperado,
None, tipo_muestra="Q/A_formatted")

print("\n🤖 (Nuevo formato):")

print(esperado, "\n")

confirm = input("¿Este formato es correcto? [S/N]: ").lower()

if confirm == "s":

self.memoria.mark_valid(turno, iid)

self.memoria.clear_errada_flag(turno["pregunta"])

def _clarify_question(self, turno: Dict, iid: str):

"""Aclarar la pregunta mal entendida"""

print("\nParece que no entendí bien tu pregunta original:")

print(f"Pregunta original: {turno['pregunta']}")

nueva_preg = input("\n¿Puedes reformular tu pregunta para que


la entienda mejor?\n> ").strip()

if nueva_preg:

self.memoria.mark_errada(turno, iid)

print("Gracias por la aclaración. Puedes hacer la nueva


pregunta ahora.")

def run_cli(self):

print("🔍 Asistente de Búsqueda Farmacéutica - escribe 'salir' para


terminar")
print(" (Usa 'exportar' para guardar la sesión o 'campos' para
ver opciones)")

while True:

q = input("\n👤 Tú: ").strip()

self.last_query = q

if q.lower() in ("salir","exit","quit"):

break

if q.lower() == "campos":

self._show_available_fields()

continue

if q.lower().startswith("exportar"):

path = os.path.join(APRENDIZAJE_DIR,
f"historial_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")

self.memoria.export_session(path)

print(f"\n📝 Sesión exportada en: {path}")

continue

try:

t_inicio = time.time()

respuesta, rule, df_res, modo = self.generate_response(q)

print("\n🤖", respuesta)

turno, iid = self.memoria.add_turn(q, respuesta, {

"modo_busqueda": modo,

"fecha": datetime.now().isoformat(timespec='seconds')

})
# Feedback loop

while True:

fb = input("\n✅ ¿La respuesta fue correcta?


[S=Si/M=Mejorar/N=No]: ").strip().lower()

fb = {"si":"s","mejorar":"m","no":"n"}.get(fb, fb)

if fb not in ("s","m","n"):

print("Por favor responde con S, M o N")

continue

self.process_feedback(turno, iid, df_res, fb)

break

self.logger.info(f"Tiempo total: {time.time()-t_inicio:.2f}s")

except Exception as e:

self.logger.error(f"Error: {e}")

print("⚠️Ocurrió un error procesando tu consulta")

# ===================== MAIN
=====================

if __name__ == "__main__":

config = {

'ruta_parquet': r"D:\Proyecto_Smart\datos_limpios\productos\
tabla_productos_limpia.parquet",

'ruta_faiss': r"D:\Proyecto_Smart\datos_limpios\productos\
index_productos.faiss",

'ruta_fragmentos': r"D:\Proyecto_Smart\datos_limpios\productos\
fragmentos_texto_productos.json",

'ruta_modelo': r"D:\Proyecto_Smart\modelos_generativos\
deepseek-llm-7b-chat.Q8_0.gguf",
'ruta_prompt_config': r"D:\Proyecto_Smart\datos_limpios\
productos\prompt_config.json",

'embedding_model': 'all-MiniLM-L6-v2',

'fuzzy_threshold': 70,

'fuzzy_top': 5,

'ruta_diccionario_symspell': r"D:\Proyecto_Smart\datos_limpios\
diccionario\frequency_dictionary_final.txt",

'nlg_max_items': 8,

'n_ctx': 4096,

'n_threads': 4,

'use_spellcheck': True,

'nlg_use_llm': True,

'semantic_fallback': True,

'debug': False,

'block_erradas': False

agent = PharmaSearchAgent(config)

agent.run_cli()

También podría gustarte