Codigo Base Agente Mejorado v3
Codigo Base Agente Mejorado v3
import pandas as pd
import numpy as np
import json
import os
import faiss
import re
import time
import unicodedata
import hashlib
import threading
CODE_COLS = {"codigosap"}
ACRONYM_RE = re.compile(r"\b[A-ZÁÉÍÓÚÑ]{2,}\b")
LONG_NUM_RE = re.compile(r"\b\d{6,}\b")
APRENDIZAJE_DIR = r"D:\Proyecto_Smart\aprendizaje_ac\
asistente_productos"
os.makedirs(APRENDIZAJE_DIR, exist_ok=True)
# Parquets
PQT_EVENTS = os.path.join(APRENDIZAJE_DIR, "events_raw.parquet")
PQT_RULES = os.path.join(APRENDIZAJE_DIR,
"memory_rules.parquet")
JSON_VALID = os.path.join(APRENDIZAJE_DIR,
"preguntas_validadas.json")
JSON_ERR = os.path.join(APRENDIZAJE_DIR,
"preguntas_erradas.json")
JSON_CORR = os.path.join(APRENDIZAJE_DIR,
"preguntas_corregidas.json")
SCHEMA_VERSION = 1
TEMPLATE_ID_DV1 = "Dv1"
HUMANIZE = {
"molecula": "Molécula",
"marca": "Marca",
"concentracion": "Concentración",
PROMPT_NLG = """
Ejemplo de estilo:
Datos encontrados:
{resultados_json}
{pregunta}
"""
PROMPT_RETRY = "No encontré la información exacta que buscas,
pero intenté con otros criterios. Esto es lo que hallé:\n\n{respuesta}"
"- Marca\n"
return ""
s = unicodedata.normalize("NFKD", s).encode("ascii",
"ignore").decode("ascii")
h = hashlib.sha256()
for p in parts:
h.update((p or "").encode("utf-8"))
return h.hexdigest()
df.to_parquet(tmp, index=False)
os.replace(tmp, path)
def _read_parquet_or_empty(path: str, cols: List[str]) ->
pd.DataFrame:
if os.path.exists(path):
return pd.read_parquet(path)
return pd.DataFrame(columns=cols)
class TypoMemory:
_lock = threading.Lock()
def __init__(self):
self.cols = ["typo","correction","count","last_seen"]
m = self.df["typo"] == typo
if m.any():
return self.df[m].iloc[-1]["correction"]
return None
ts = datetime.now().isoformat(timespec='seconds')
with self._lock:
m = (self.df["typo"]==typo) &
(self.df["correction"]==correction)
if m.any():
idx = self.df[m].index[0]
self.df.loc[idx, "last_seen"] = ts
else:
self.df.loc[len(self.df)] =
{"typo":typo,"correction":correction,"count":1,"last_seen":ts}
_safe_write_parquet(self.df, PQT_TYPO)
class RuleMemory:
_lock = threading.Lock()
self.embedder = embedder
self.columns = [
"canonical_q_id","q_text_canonica","q_embed_json",
"field_hints_json","expected_answer","dont_say_json",
"last_update","uses_count","quality_score_rule","schema_version"
self._refresh_matrix()
def _refresh_matrix(self):
if self.df.empty:
else:
embeds = self.df["q_embed_json"].fillna("[]").map(lambda x:
np.array(json.loads(x), dtype=np.float32))
self._emb_matrix = np.vstack(embeds.values) if
len(embeds)>0 else np.zeros((0,1),dtype=np.float32)
if self._emb_matrix.shape[0] == 0:
return None
idx = int(np.argmax(sims))
q_embed = self.embedder.encode([query]).astype(np.float32)[0]
if idx is None:
return None
row = self.df.iloc[idx]
return {
"canonical_q_id": row["canonical_q_id"],
"q_text_canonica": row["q_text_canonica"],
quality: float=1.0):
q_embed = self.embedder.encode([pregunta]).astype(np.float32)
[0]
with self._lock:
now = datetime.now().isoformat(timespec='seconds')
if idx is None:
self.df.loc[len(self.df)] = {
"q_text_canonica": pregunta,
"q_embed_json": json.dumps(q_embed.tolist()),
"expected_answer": expected_answer,
"dont_say_json": json.dumps([]),
"last_update": now,
"uses_count": 0,
"quality_score_rule": quality,
"schema_version": SCHEMA_VERSION
else:
self.df.loc[idx, "q_embed_json"] =
json.dumps(q_embed.tolist())
if expected_answer:
_safe_write_parquet(self.df, PQT_RULES)
self._refresh_matrix()
m = self.df["canonical_q_id"] == canonical_q_id
if m.any():
idx = self.df[m].index[0]
_safe_write_parquet(self.df, PQT_RULES)
class MemorySupervisor:
_lock = threading.Lock()
def __init__(self):
self.ev_cols = [
"event_id","timestamp","interaction_id","tipo_evento","subtipo","esta
do",
"pregunta","respuesta","contexto","feedback","campo_solicitado",
"valores_correctos","nombre_humano","who","schema_version"
self.events_df = _read_parquet_or_empty(PQT_EVENTS,
self.ev_cols)
self.ft_cols =
["sample_id","instruction","input","output","source_rule_id","quality",
"template_id","version_schema","fecha_inclusion"]
self.erradas_sim =
self.events_df[(self.events_df["tipo_evento"]=="interaction") &
(self.events_df["subtipo"]=="errada")]
["pregunta"].dropna().tolist()
self.memoria_turnos: List[Dict] = []
self._migrate_legacy_json()
def _migrate_legacy_json(self):
def load(path):
if os.path.isfile(path):
with open(path,'r',encoding='utf-8') as f:
return json.load(f)
return []
rows = []
for r in val:
ts = r.get("timestamp",
datetime.now().isoformat(timespec='seconds'))
rows.append({
"event_id": _make_hash(iid,"validada",ts),
"timestamp": ts,
"interaction_id": iid,
"tipo_evento": "interaction",
"subtipo": "validada",
"estado": "validada",
"pregunta": r.get("pregunta",""),
"respuesta": r.get("respuesta",""),
"contexto": r.get("contexto",{}),
"feedback": "s",
"campo_solicitado": None,
"valores_correctos": None,
"nombre_humano": None,
"who": "user",
"schema_version": SCHEMA_VERSION
})
for r in err:
ts = r.get("timestamp",
datetime.now().isoformat(timespec='seconds'))
rows.append({
"event_id": _make_hash(iid,"errada",ts),
"timestamp": ts,
"interaction_id": iid,
"tipo_evento": "interaction",
"subtipo": "errada",
"estado": "n/a",
"pregunta": r.get("pregunta",""),
"respuesta": r.get("respuesta",""),
"contexto": r.get("contexto",{}),
"feedback": "n",
"campo_solicitado": None,
"valores_correctos": None,
"nombre_humano": None,
"who": "user",
"schema_version": SCHEMA_VERSION
})
for r in corr:
ts = r.get("timestamp",
datetime.now().isoformat(timespec='seconds'))
rows.append({
"event_id": _make_hash(iid,"corregida",ts),
"timestamp": ts,
"interaction_id": iid,
"tipo_evento": "correction",
"subtipo": "corregida",
"estado": "validada",
"pregunta": r.get("pregunta",""),
"respuesta": r.get("respuesta_original",""),
"contexto": None,
"feedback": "n",
"campo_solicitado": r.get("campo_solicitado",""),
"valores_correctos": r.get("valores_correctos",[]),
"nombre_humano": r.get("nombre_humano",""),
"who": "user",
"schema_version": SCHEMA_VERSION
})
if rows:
self.events_df = pd.concat([self.events_df,
pd.DataFrame(rows)], ignore_index=True)
_safe_write_parquet(self.events_df, PQT_EVENTS)
self.erradas_sim =
self.events_df[(self.events_df["tipo_evento"]=="interaction") &
(self.events_df["subtipo"]=="errada")]
["pregunta"].dropna().tolist()
def log_event(self, **kwargs):
with self._lock:
kwargs["event_id"] =
_make_hash(kwargs.get("interaction_id",""),
kwargs.get("timestamp",""),
kwargs.get("tipo_evento",""))
if (self.events_df["event_id"]==kwargs["event_id"]).any():
return
_safe_write_parquet(self.events_df, PQT_EVENTS)
turno = {
"timestamp": datetime.now().isoformat(timespec='seconds'),
"pregunta": pregunta,
"respuesta": respuesta,
"feedback": "",
"contexto": contexto
self.memoria_turnos.append(turno)
self.log_event(event_id=_make_hash(iid,"raw",turno["timestamp"]),
timestamp=turno["timestamp"],
interaction_id=iid,
tipo_evento="interaction",
subtipo="raw",
estado="n/a",
pregunta=pregunta,
respuesta=respuesta,
contexto=contexto,
feedback="",
campo_solicitado=None,
valores_correctos=None,
nombre_humano=None,
who="agent",
schema_version=SCHEMA_VERSION)
self.log_event(event_id=_make_hash(iid,"validada",turno["timestamp"
]),
timestamp=turno["timestamp"],
interaction_id=iid,
tipo_evento="interaction",
subtipo="validada",
estado="validada",
pregunta=turno["pregunta"],
respuesta=turno["respuesta"],
contexto=turno["contexto"],
feedback="s",
campo_solicitado=None,
valores_correctos=None,
nombre_humano=None,
who="user",
schema_version=SCHEMA_VERSION)
def mark_errada(self, turno, iid):
self.log_event(event_id=_make_hash(iid,"errada",turno["timestamp"]),
timestamp=turno["timestamp"],
interaction_id=iid,
tipo_evento="interaction",
subtipo="errada",
estado="n/a",
pregunta=turno["pregunta"],
respuesta=turno["respuesta"],
contexto=turno["contexto"],
feedback="n",
campo_solicitado=None,
valores_correctos=None,
nombre_humano=None,
who="user",
schema_version=SCHEMA_VERSION)
self.erradas_sim.append(turno["pregunta"])
ts = datetime.now().isoformat(timespec='seconds')
self.log_event(event_id=_make_hash(iid,"corregida",ts),
timestamp=ts,
interaction_id=iid,
tipo_evento="correction",
subtipo="corregida",
estado="validada",
pregunta=turno["pregunta"],
respuesta=turno["respuesta"],
contexto=None,
feedback="n",
campo_solicitado=campo,
valores_correctos=valores,
nombre_humano=HUMANIZE.get(campo,campo),
who="user",
schema_version=SCHEMA_VERSION)
source_rule_id: Optional[str],
if mask.any():
return
row = {
"sample_id": sid,
"instruction": instruction,
"input": input_text,
"output": output,
"source_rule_id": source_rule_id,
"quality": quality,
"template_id": TEMPLATE_ID_DV1,
"version_schema": SCHEMA_VERSION,
"fecha_inclusion":
datetime.now().isoformat(timespec='seconds')
}
with self._lock:
self.ft_df.loc[len(self.ft_df)] = row
_safe_write_parquet(self.ft_df, PQT_FT)
if not self.erradas_sim:
return False
return True
return False
json.dump(self.memoria_turnos, f, ensure_ascii=False,
indent=2)
# ===================== CONFIG
=====================
class Config:
self.ruta_parquet = d['ruta_parquet']
self.ruta_faiss = d['ruta_faiss']
self.ruta_fragmentos = d['ruta_fragmentos']
self.ruta_modelo = d['ruta_modelo']
self.ruta_prompt_config = d['ruta_prompt_config']
self.fuzzy_top = d.get('fuzzy_top', 5)
self.nlg_max_items = d.get('nlg_max_items', 8)
self.n_threads = d.get('n_threads', 4)
self.ruta_diccionario_symspell =
d.get('ruta_diccionario_symspell')
class PharmaSearchAgent:
self.config = Config(config)
self._setup_logging()
self._load_prompt_config()
self._load_resources()
self._build_indices()
self._setup_templates()
self.memoria = MemorySupervisor()
self.rules = RuleMemory(self.embedder)
self.typos = TypoMemory()
self.last_query = ""
self.logger.info("✅ Sistema inicializado correctamente")
def _setup_logging(self):
import sys, io
logger = logging.getLogger("PharmaSearchAgent")
try:
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
stream = sys.stdout
except Exception:
handler = logging.StreamHandler(stream)
handler.setFormatter(fmt)
logger.handlers.clear()
logger.addHandler(handler)
logger.propagate = False
self.logger = logger
def _load_prompt_config(self):
cfg = json.load(f)
self.intents = cfg.get('intents', {
"codes": ["codigo","código","codigos","sap","sku","id"],
"forms": ["forma","farmaceutica","farmacéutica","formas"],
"presentations":["presentacion","presentaciones"],
"names":["nombre","descripcion","descripción"],
"all":["todos","todas","lista","listar"]
})
self.priority_cols = cfg.get('priority_cols', [
"presentacionesdeproducto","molecula","marca","lineacomercial",
"grupodetrabajo","formafarmaceutica","gerente","tipodemolecula","ti
podeproducto"
])
self.stopwords = set(cfg.get('stopwords', [
"que","cual","cuál","tengo","es","el","la","los","las","de","del","para","
en","con",
"maneja","ve","contiene","dime","dame","busca","buscar","listar","list
a","todos","todas"
]))
def _load_resources(self):
tasks = {
'data': self._load_data,
'emb': self._load_embeddings,
'faiss': self._load_faiss,
'frags': self._load_fragments,
'llm': self._load_llm,
'spell': self._setup_spellcheck
}
with ThreadPoolExecutor() as ex:
name = futs[fut]
try:
fut.result()
except Exception as e:
if name in ("data","llm"):
raise
def _load_data(self):
self.df = pd.read_parquet(self.config.ruta_parquet)
self.df.columns = self.df.columns.str.lower()
if col in self.df.columns:
self.df[col+"_norm"] =
self.df[col].astype(str).map(_norm_txt)
self.domain_vocab = set()
if col in self.df.columns:
self.domain_vocab.update(self.df[col].astype(str).str.lower().unique())
def _load_embeddings(self):
self.embedder =
SentenceTransformer(self.config.embedding_model, device='cpu')
def _load_faiss(self):
try:
self.faiss_index = faiss.read_index(self.config.ruta_faiss)
except Exception as e:
self.faiss_index = None
def _load_fragments(self):
with open(self.config.ruta_fragmentos,'r',encoding='utf-8') as f:
self.text_fragments = json.load(f)
def _load_llm(self):
self.llm = Llama(model_path=self.config.ruta_modelo,
n_ctx=self.config.n_ctx,
n_threads=self.config.n_threads,
verbose=False)
def _setup_spellcheck(self):
if not self.config.use_spellcheck:
self.spellchecker = None
return
self.spellchecker = SymSpell(max_dictionary_edit_distance=2,
prefix_length=7)
if self.config.ruta_diccionario_symspell:
self.spellchecker.load_dictionary(self.config.ruta_diccionario_symspell,
0, 1, encoding='utf-8')
def _build_indices(self):
self.inverted_indices = {}
if ncol in self.df.columns:
tok_series = self.df[ncol].str.split()
mapping = {}
if isinstance(toks, list):
for t in toks:
mapping.setdefault(t, set()).add(idx)
self.inverted_indices[col] = mapping
"""
"""
# 1. Preprocesamiento inteligente
analisis = self._analizar_consulta_avanzada(query_corregida)
self._registrar_interaccion(
pregunta_original=query,
pregunta_procesada=query_corregida,
analisis=analisis,
resultados=resultados,
respuesta=respuesta
# Embedding de la pregunta
embedding = self.embedder.encode([query])[0]
return {
'embedding': embedding,
'entidades': self._extraer_entidades(query),
'atributos': self._extraer_atributos(query),
'tipo_pregunta': self._clasificar_tipo_pregunta(embedding),
'preguntas_similares':
self._buscar_preguntas_similares(embedding)
}
def _busqueda_adaptativa(self, analisis: Dict) ->
Tuple[pd.DataFrame, str]:
if analisis['entidades']:
resultado = self._buscar_por_entidad(analisis['entidades']
[0])
if not resultado.empty:
return self._busqueda_semantica(analisis['embedding']),
"semantica"
plantillas = {
'entidad_atributo': (
),
'comparacion': (
),
'general': (
"Esto es lo que encontré sobre tu consulta:\
n{resultados}\n"
tipo = analisis['tipo_pregunta']
if tipo in plantillas:
return plantillas[tipo].format(
entidad=analisis['entidades'][0] if analisis['entidades']
else 'producto',
valor_entidad=resultados.iloc[0][analisis['entidades'][0]]
if analisis['entidades'] else '',
valores=", ".join(resultados[analisis['atributos']
[0]].unique()) if analisis['atributos'] else '',
productos="\n-
".join(resultados['presentacionesdeproducto'].unique()),
resultados=resultados.to_string(index=False)
"""
Returns:
"""
return pd.DataFrame()
value_norm = _norm_txt(value)
if field in self.df.columns:
if field in CODE_COLS:
else:
if f"{field}_norm" in self.df.columns:
else:
cond |= (self.df[field].astype(str).str.lower() ==
value_norm)
return self.df[cond].copy()
def _search_code(self, code: str) -> pd.DataFrame:
"""
Args:
Returns:
"""
return pd.DataFrame()
"""
Args:
Returns:
"""
return pd.DataFrame()
tokens = [t for t in _norm_txt(value).split() if len(t) > 3]
if not tokens:
return pd.DataFrame()
series = self.df[col_norm].astype(str).map(_norm_txt)
for t in tokens:
return self.df[cond].copy()
freq = self.df.groupby([target,
atributo]).size().reset_index(name='count')
if not freq.empty:
tokens = self._analizar_pregunta(pregunta)
# Extracción de entidades y atributos
entidades = self._identificar_entidades(tokens)
atributos = self._identificar_atributos(tokens)
target = entidades[0]
attr = atributos[0]
if not df.empty:
valores = df[attr].dropna().unique()
", ".join(valores[:5]) +
embedding = self.embedder.encode([pregunta])[0]
similares = self._buscar_preguntas_similares(embedding)
return {
'embedding': embedding,
'campos_relevantes': self._inferir_campos(embedding),
'tipo_respuesta': self._predecir_tipo_respuesta(embedding),
'preguntas_similares': similares
pass
campos_posibles = list(HUMANIZE.keys())
# Registro de la interacción
self.memoria.registrar_interaccion(
pregunta=pregunta,
respuesta=respuesta,
feedback=feedback,
embedding=self.embedder.encode([pregunta])[0]
self._reforzar_patron(pregunta, respuesta)
else:
self._ajustar_modelo(pregunta, respuesta)
pass
def _setup_templates(self):
self.templates = {
'single': (
),
'multi': (
"{rows}\n\n"
"{extra}"
),
'nf': (
),
'field_only': (
"{valores}\n\n"
"{extra}"
),
'typo_ask': (
result = {
"is_code": False,
"code": "",
"intent": "general",
"fields": [],
"value": "",
"dont_say": []
}
# Normalizar consulta
qn = _norm_txt(q)
code_match = re.search(r"\b\d{6,}\b", q)
if code_match:
result["is_code"] = True
result["code"] = code_match.group()
result["intent"] = "code"
result["fields"] = ["codigosap"]
result["value"] = q
return result
intent = self._detect_intent(qn)
result["intent"] = intent["type"]
if intent["fields"]:
result["fields"] = intent["fields"]
if t not in self.stopwords
ql = q.lower()
intent = {
"type": "general",
"fields": [],
"priority": 1,
"certainty": 0.8
campo_pregunta = self._detectar_campo_pregunta(q)
if campo_pregunta:
intent.update({
"type": "field_query",
"fields": [campo_pregunta],
"priority": 2,
"certainty": 0.95
})
return intent
kw_mapping = {
intent.update({
"type": intent_type,
"fields": fields,
"priority": 2,
"certainty": 0.9
})
break
if intent["type"] == "general":
tokens = ql.split()
intent["fields"].append(col)
return intent
try:
# Buscar regla similar con un umbral de similitud alta
if rule:
self.logger.debug(f"Regla recuperada:
{rule['q_text_canonica']}")
self.rules.record_use(rule['canonical_q_id'])
if valid_fields:
rule['field_hints'] = valid_fields
else:
rule['field_hints'] = self.priority_cols
return rule
return None
except Exception as e:
return None
return parsed
# Priorizar campos sugeridos por la regla
if hints:
if 'dont_say' in rule:
parsed['dont_say'] = rule['dont_say']
return parsed
tokens = text.split()
new_toks: List[str] = []
for t in tokens:
tn = _norm_txt(t)
continue
learned_corr = self.typos.get(tn)
if learned_corr:
corrected[t] = learned_corr
new_toks.append(learned_corr)
continue
if self.spellchecker:
candidates[sug.term] = max(candidates.get(sug.term,
0), score)
if not candidates:
new_toks.append(t)
continue
corrected[t] = best_term
new_toks.append(best_term)
self.typos.upsert(tn, best_term)
key=lambda x: x[1],
reverse=True)[:3]]
ambiguous[t] = top_candidates
new_toks.append(t)
new_toks.append(t)
while True:
if choice.isdigit():
choice_idx = int(choice)
if parsed.get('is_code'):
return self._search_code(parsed['code'])
if not campos:
campo_pregunta =
self._detectar_campo_pregunta(parsed['value'])
if campo_pregunta:
campos = [campo_pregunta]
else:
campos = self.priority_cols
# Intentar diferentes estrategias de búsqueda exacta
if not res.empty:
return res
if not res.empty:
return res
return pd.DataFrame()
return pd.DataFrame()
embed = self.embedder.encode([query]).astype(np.float32)
D, I = self.faiss_index.search(embed, k=10)
if not res.empty:
return res
v = _norm_txt(query)
cols =
["presentacionesdeproducto","presentacionesproductosap","molecula
","marca","lineacomercial","grupodetrabajo"]
for c in cols:
if c in self.df.columns:
cond |= self.df[c].astype(str).map(_norm_txt).str.contains(v,
regex=False)
return self.df[cond]
parsed = self._parse(query)
if rule:
df_res = self._exact_search(parsed)
if not df_res.empty:
if self.config.semantic_fallback:
df_res = self._semantic_fallback(query)
if not df_res.empty:
df_res = self._fuzzy_search(parsed)
if not df_res.empty:
df_res = self._wide_search(query)
if not df_res.empty:
q = _norm_txt(pregunta)
if campo in q or _norm_txt(nombre) in q:
return campo
if hasattr(self, "aliases"):
if alias in q:
return campo
return None
t0 = time.time()
if auto_corr:
if ambig:
user_fix = input(self.templates['typo_ask'].format(lista=lista)
+ "\n> ").strip()
if user_fix:
fixed_q = user_fix
for c in cands:
if c in _norm_txt(user_fix).split():
self.typos.upsert(_norm_txt(orig), c)
break
# Detectar si la pregunta es sobre un campo específico
campo_pregunta = self._detectar_campo_pregunta(fixed_q)
rule = self._retrieve_rule(fixed_q)
if df_res.empty:
use_llm = self.config.nlg_use_llm
if modo == "exacta":
resp = PROMPT_RETRY.format(respuesta=self._gen_nlg(df_res,
fixed_q, modo=modo, style_hint=style_hint, use_llm=use_llm))
resp = PROMPT_SUGERENCIA
else:
resp = self._postprocess_response(resp)
if df_res.empty:
return self._response_not_found(query)
processed_data = []
item = {}
if pd.notna(val):
item[human_col] = str(val)
processed_data.append(item)
if use_llm:
try:
prompt = PROMPT_NLG.format(
pregunta=query,
resultados_json=json.dumps(processed_data,
ensure_ascii=False, indent=2)
resp = out['choices'][0]['text'].strip()
if resp:
return resp
except Exception as e:
if len(df_res) == 1:
row = df_res.iloc[0]
data = {
return self.templates['single'].format(**data)
else:
vals = df_res[campo].dropna().astype(str).unique().tolist()
if len(vals) == 1:
else:
primeras = vals[:5]
return self.templates['field_only'].format(
campo=human.lower(),
valores=lista,
extra=extra
limit = 50 if re.search(r"\btodo[s]?\b|\btodas\b|\blista\b",
_norm_txt(query)) else self.config.nlg_max_items
df = df.drop_duplicates('codigosap',
ignore_index=True).head(limit)
lines = [
for _, r in df.iterrows()
extra = "" if len(df) < limit else "\n… Hay más resultados.
Pide \"todos\" para ver la lista completa."
sugs = self._suggest(q)
sugs = []
if 'presentacionesdeproducto' in self.df.columns:
return sugs[:3]
return text
# Eliminar duplicados
lines = text.splitlines()
unique_lines = []
seen = set()
key = line.strip().lower()
unique_lines.append(line)
seen.add(key)
cleaned = "\n".join(unique_lines)
return cleaned
def _show_available_fields(self):
print(f"{i}. {val}")
def _ask_for_correct_field(self):
while True:
if campo.isdigit():
num = int(campo)
campo_lower = campo.lower()
return key
if feedback == "s":
self._handle_positive_feedback(turno, iid)
self._handle_improvement_feedback(turno, iid)
self.memoria.mark_valid(turno, iid)
self.memoria.clear_errada_flag(turno["pregunta"])
self.memoria.add_ft_sample(turno["pregunta"],
turno["respuesta"], None)
if mej in ("s","si"):
esperado = input("Escribe la respuesta esperada tal como la
quieres ver:\n> ").strip()
self.memoria.add_ft_sample(turno["pregunta"], esperado,
None, tipo_muestra="Q/A_expected")
self.memoria.mark_valid(turno, iid)
self.memoria.clear_errada_flag(turno["pregunta"])
if issue == "a":
self._improve_format(turno, iid)
self._clarify_question(turno, iid)
self._show_available_fields()
campo = self._ask_for_correct_field()
if campo:
print(nueva_resp, "\n")
# Guardar corrección
self.rules.upsert_rule(
turno["pregunta"],
[campo],
nueva_resp,
quality=1.0
self.memoria.add_ft_sample(
turno["pregunta"],
nueva_resp,
None,
tipo_muestra="Q/A_corrected"
if confirm == "s":
self.memoria.mark_valid(turno, iid)
self.memoria.clear_errada_flag(turno["pregunta"])
self._show_available_fields()
campo = self._ask_for_correct_field()
if campo:
# Generar respuesta con la información adicional
print(nueva_resp, "\n")
# Guardar corrección
self.rules.upsert_rule(
turno["pregunta"],
[campo],
nueva_resp,
quality=1.0
self.memoria.add_ft_sample(
turno["pregunta"],
nueva_resp,
None,
tipo_muestra="Q/A_augmented"
if confirm == "s":
self.memoria.mark_valid(turno, iid)
self.memoria.clear_errada_flag(turno["pregunta"])
print(turno["respuesta"])
esperado = input("\nEscribe cómo prefieres que se formatee la
respuesta:\n> ").strip()
if esperado:
self.memoria.add_ft_sample(turno["pregunta"], esperado,
None, tipo_muestra="Q/A_formatted")
print(esperado, "\n")
if confirm == "s":
self.memoria.mark_valid(turno, iid)
self.memoria.clear_errada_flag(turno["pregunta"])
if nueva_preg:
self.memoria.mark_errada(turno, iid)
def run_cli(self):
while True:
self.last_query = q
if q.lower() in ("salir","exit","quit"):
break
if q.lower() == "campos":
self._show_available_fields()
continue
if q.lower().startswith("exportar"):
path = os.path.join(APRENDIZAJE_DIR,
f"historial_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
self.memoria.export_session(path)
continue
try:
t_inicio = time.time()
print("\n🤖", respuesta)
"modo_busqueda": modo,
"fecha": datetime.now().isoformat(timespec='seconds')
})
# Feedback loop
while True:
fb = {"si":"s","mejorar":"m","no":"n"}.get(fb, fb)
if fb not in ("s","m","n"):
continue
break
except Exception as e:
self.logger.error(f"Error: {e}")
# ===================== MAIN
=====================
if __name__ == "__main__":
config = {
'ruta_parquet': r"D:\Proyecto_Smart\datos_limpios\productos\
tabla_productos_limpia.parquet",
'ruta_faiss': r"D:\Proyecto_Smart\datos_limpios\productos\
index_productos.faiss",
'ruta_fragmentos': r"D:\Proyecto_Smart\datos_limpios\productos\
fragmentos_texto_productos.json",
'ruta_modelo': r"D:\Proyecto_Smart\modelos_generativos\
deepseek-llm-7b-chat.Q8_0.gguf",
'ruta_prompt_config': r"D:\Proyecto_Smart\datos_limpios\
productos\prompt_config.json",
'embedding_model': 'all-MiniLM-L6-v2',
'fuzzy_threshold': 70,
'fuzzy_top': 5,
'ruta_diccionario_symspell': r"D:\Proyecto_Smart\datos_limpios\
diccionario\frequency_dictionary_final.txt",
'nlg_max_items': 8,
'n_ctx': 4096,
'n_threads': 4,
'use_spellcheck': True,
'nlg_use_llm': True,
'semantic_fallback': True,
'debug': False,
'block_erradas': False
agent = PharmaSearchAgent(config)
agent.run_cli()