El engaño de los threads que te está costando rendimiento
Voy directo al grano.
Si llevas más de 6 meses programando en Python y sigues usando threads para “hacer tu código más rápido”, este documento te va a doler. Pero es necesario.
Los threads en Python no funcionan como en Java o C++. Y si no entiendes por qué, estás escribiendo código lento creyendo que es rápido.
Vamos a destrozar los 7 errores más comunes que veo cada día en código de producción.
1. Hacer peticiones HTTP en paralelo
El error de los 100 threads para 100 peticiones
Todos hemos hecho esto. Necesitas llamar a 100 endpoints de una API y piensas: “voy a crear 100 threads”. Acabas de desperdiciar 1GB de RAM para nada.
Lo que estás haciendo mal:
import threading
import requests
import time
def obtener_datos_api(url, resultados, indice):
"""Cada thread hace una petición"""
response = requests.get(url)
resultados[indice] = response.json()
# MAL - Un thread por petición
urls = [f"https://api.ejemplo.com/usuario/{i}" for i in range(100)]
resultados = [None] * 100
threads = []
inicio = time.time()
for i, url in enumerate(urls):
t = threading.Thread(target=obtener_datos_api, args=(url, resultados, i))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Tiempo con threads: {time.time() - inicio:.2f}s")
print(f"Memoria desperdiciada: ~{len(threads) * 8}MB")
Por qué es un desastre:
- Cada thread consume 8-16MB solo por existir
- El GIL impide que se ejecuten en paralelo real
- La sincronización añade overhead innecesario
- requests.get() ya es I/O no bloqueante internamente
La solución que deberías usar:
import asyncio
import aiohttp
import time
async def obtener_datos_async(session, url):
"""Una corutina ligera por petición"""
async with session.get(url) as response:
return await response.json()
async def obtener_todos_los_datos():
urls = [f"https://api.ejemplo.com/usuario/{i}" for i in range(100)]
# Un solo objeto session reutilizable
async with aiohttp.ClientSession() as session:
# Crear todas las tareas
tareas = [obtener_datos_async(session, url) for url in urls]
# Ejecutar todas en paralelo
return await asyncio.gather(*tareas)
# BIEN - Sin threads, 100x menos memoria
inicio = time.time()
resultados = asyncio.run(obtener_todos_los_datos())
print(f"Tiempo con asyncio: {time.time() - inicio:.2f}s")
print(f"Memoria usada: ~2MB") # En lugar de 800MB
# Bonus: Control de concurrencia
async def obtener_con_limite():
"""Limita a 10 conexiones simultáneas"""
urls = [f"https://api.ejemplo.com/usuario/{i}" for i in range(100)]
# Semáforo para limitar concurrencia
limite = asyncio.Semaphore(10)
async def fetch_limitado(session, url):
async with limite: # Máximo 10 a la vez
return await obtener_datos_async(session, url)
async with aiohttp.ClientSession() as session:
tareas = [fetch_limitado(session, url) for url in urls]
return await asyncio.gather(*tareas)
Diferencia real en producción:
- Threads: 100 peticiones = 800MB RAM, 5 segundos
- Asyncio: 100 peticiones = 2MB RAM, 2 segundos
2. Procesar archivos línea por línea
Cuando los threads matan el rendimiento de I/O
Crear un thread por archivo o por chunk de archivo es el error más caro que puedes cometer.
El código que todos hemos escrito:
import threading
import time
def procesar_archivo_con_thread(nombre_archivo, resultados):
"""Procesa un archivo en un thread separado"""
contador_lineas = 0
suma_valores = 0
with open(nombre_archivo, 'r') as f:
for linea in f:
# Simular procesamiento
valores = linea.strip().split(',')
if valores:
contador_lineas += 1
suma_valores += len(valores)
resultados[nombre_archivo] = {
'lineas': contador_lineas,
'suma': suma_valores
}
# MAL - Un thread por archivo
archivos = [f"datos_{i}.csv" for i in range(10)]
resultados = {}
threads = []
for archivo in archivos:
t = threading.Thread(target=procesar_archivo_con_thread, args=(archivo, resultados))
threads.append(t)
t.start()
for t in threads:
t.join()
Por qué falla:
- El GIL serializa todo el procesamiento Python
- Los threads compiten por acceso al disco
- El cambio de contexto añade latencia
- No aprovechas el buffer del sistema operativo
La forma correcta con generadores:
import asyncio
import aiofiles
from pathlib import Path
async def procesar_archivo_async(nombre_archivo):
"""Procesa archivo sin bloquear"""
contador_lineas = 0
suma_valores = 0
async with aiofiles.open(nombre_archivo, 'r') as f:
async for linea in f:
# Procesamiento sin bloquear el event loop
valores = linea.strip().split(',')
if valores:
contador_lineas += 1
suma_valores += len(valores)
# Dar control al event loop cada 1000 líneas
if contador_lineas % 1000 == 0:
await asyncio.sleep(0)
return {
'archivo': nombre_archivo,
'lineas': contador_lineas,
'suma': suma_valores
}
async def procesar_todos_los_archivos():
archivos = [f"datos_{i}.csv" for i in range(10)]
# Procesar todos en paralelo
tareas = [procesar_archivo_async(archivo) for archivo in archivos]
return await asyncio.gather(*tareas)
# Ejecutar
resultados = asyncio.run(procesar_todos_los_archivos())
# Alternativa: Generador eficiente para archivos grandes
def procesar_archivos_eficiente(archivos):
"""Usa generadores para memoria constante"""
def procesar_linea(linea):
# Tu lógica aquí
return len(linea.strip().split(','))
for archivo in archivos:
# Generador que no carga todo en memoria
with open(archivo, 'r') as f:
# Procesar en chunks sin threads
resultado = sum(
procesar_linea(linea)
for linea in f
if linea.strip()
)
yield archivo, resultado
# Uso eficiente sin threads ni async
archivos = [f"datos_{i}.csv" for i in range(10)]
for archivo, resultado in procesar_archivos_eficiente(archivos):
print(f"{archivo}: {resultado}")
La solución óptima para archivos enormes:
from multiprocessing import Pool
import mmap
import os
def procesar_chunk_mmap(args):
"""Procesa un chunk de archivo con memory mapping"""
nombre_archivo, inicio, fin = args
with open(nombre_archivo, 'r+b') as f:
# Memory mapping - no carga el archivo en RAM
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped:
# Procesar solo el chunk asignado
chunk = mmapped[inicio:fin]
# Tu procesamiento aquí
lineas = chunk.count(b'\n')
return lineas
def procesar_archivo_grande_paralelo(nombre_archivo, num_procesos=4):
"""Divide archivo grande en chunks para procesamiento paralelo"""
# Obtener tamaño del archivo
tamano = os.path.getsize(nombre_archivo)
chunk_size = tamano // num_procesos
# Crear argumentos para cada proceso
chunks = []
for i in range(num_procesos):
inicio = i * chunk_size
fin = tamano if i == num_procesos - 1 else (i + 1) * chunk_size
chunks.append((nombre_archivo, inicio, fin))
# Procesar en paralelo REAL
with Pool(num_procesos) as pool:
resultados = pool.map(procesar_chunk_mmap, chunks)
return sum(resultados)
# Para archivo de 1GB - 4x más rápido que threads
total_lineas = procesar_archivo_grande_paralelo("archivo_enorme.txt")
3. Actualizar una GUI (Tkinter, PyQt, etc.)
El error clásico que congela tu interfaz
Si usas threads para actualizar la GUI, estás creando race conditions y crashes aleatorios.
Lo que NO debes hacer:
import tkinter as tk
import threading
import time
import random
class AppConThreads:
def __init__(self, root):
self.root = root
self.label = tk.Label(root, text="Contador: 0")
self.label.pack()
self.boton = tk.Button(root, text="Iniciar", command=self.iniciar_contador)
self.boton.pack()
self.contador = 0
def actualizar_gui_desde_thread(self):
"""PELIGRO - Actualizar GUI desde thread"""
for i in range(100):
time.sleep(0.1)
self.contador = i
# ESTO CAUSARÁ CRASHES ALEATORIOS
self.label.config(text=f"Contador: {self.contador}")
# A veces funciona, a veces crash, siempre impredecible
def iniciar_contador(self):
# MAL - Thread actualizando GUI directamente
t = threading.Thread(target=self.actualizar_gui_desde_thread)
t.daemon = True
t.start()
# Esto crasheará aleatoriamente
root = tk.Tk()
app = AppConThreads(root)
root.mainloop()
Por qué explota:
- Las GUIs NO son thread-safe
- Tkinter usa Tcl/Tk que tiene su propio thread
- Las actualizaciones desde threads causan corrupción de memoria
- Los crashes son impredecibles y difíciles de debuggear
La solución correcta con colas:
import tkinter as tk
import queue
import time
from threading import Thread
class AppSegura:
def __init__(self, root):
self.root = root
self.label = tk.Label(root, text="Contador: 0")
self.label.pack()
self.progress = tk.Label(root, text="Progreso: 0%")
self.progress.pack()
self.boton = tk.Button(root, text="Iniciar", command=self.iniciar_proceso)
self.boton.pack()
# Cola thread-safe para comunicación
self.cola_updates = queue.Queue()
# Iniciar loop de actualización
self.procesar_cola()
def trabajo_en_background(self):
"""Trabajo pesado en thread separado"""
for i in range(101):
time.sleep(0.05) # Simular trabajo
# Enviar actualización a través de la cola
self.cola_updates.put({
'tipo': 'contador',
'valor': i
})
if i % 10 == 0:
self.cola_updates.put({
'tipo': 'progreso',
'valor': i
})
def procesar_cola(self):
"""Procesa actualizaciones en el thread principal"""
try:
while True:
# No bloquear, procesar todo lo disponible
update = self.cola_updates.get_nowait()
if update['tipo'] == 'contador':
self.label.config(text=f"Contador: {update['valor']}")
elif update['tipo'] == 'progreso':
self.progress.config(text=f"Progreso: {update['valor']}%")
except queue.Empty:
pass
# Programar siguiente check
self.root.after(50, self.procesar_cola)
def iniciar_proceso(self):
"""Inicia trabajo en background de forma segura"""
thread = Thread(target=self.trabajo_en_background)
thread.daemon = True
thread.start()
# Versión moderna con asyncio y tkinter
import asyncio
class AppAsync:
def __init__(self, root):
self.root = root
self.label = tk.Label(root, text="Estado: Esperando")
self.label.pack()
self.boton = tk.Button(
root,
text="Iniciar Async",
command=lambda: asyncio.create_task(self.proceso_async())
)
self.boton.pack()
async def proceso_async(self):
"""Proceso asíncrono que no bloquea la GUI"""
for i in range(10):
self.label.config(text=f"Procesando: {i+1}/10")
# Dar control al event loop
await asyncio.sleep(0.5)
# Actualizar GUI de forma segura
self.root.update_idletasks()
self.label.config(text="Completado!")
# Para PyQt/PySide - Usar QThread correctamente
from PyQt5.QtCore import QThread, pyqtSignal, QObject
from PyQt5.QtWidgets import QApplication, QLabel, QPushButton, QVBoxLayout, QWidget
class TrabajadorSeguro(QObject):
"""Worker que emite señales en lugar de actualizar GUI"""
progreso = pyqtSignal(int)
completado = pyqtSignal(str)
def ejecutar(self):
for i in range(101):
time.sleep(0.05)
# Emitir señal - thread-safe
self.progreso.emit(i)
self.completado.emit("Proceso terminado")
class VentanaSegura(QWidget):
def __init__(self):
super().__init__()
layout = QVBoxLayout()
self.label = QLabel("0%")
layout.addWidget(self.label)
self.boton = QPushButton("Iniciar")
self.boton.clicked.connect(self.iniciar_trabajo)
layout.addWidget(self.boton)
self.setLayout(layout)
# Configurar thread y worker
self.thread = QThread()
self.worker = TrabajadorSeguro()
self.worker.moveToThread(self.thread)
# Conectar señales
self.thread.started.connect(self.worker.ejecutar)
self.worker.progreso.connect(self.actualizar_progreso)
self.worker.completado.connect(self.trabajo_completado)
def iniciar_trabajo(self):
self.thread.start()
def actualizar_progreso(self, valor):
"""Actualización thread-safe desde señal"""
self.label.setText(f"{valor}%")
def trabajo_completado(self, mensaje):
self.label.setText(mensaje)
self.thread.quit()
4. Escribir logs desde múltiples partes de tu aplicación
El caos de los logs mezclados
Si cada thread escribe logs directamente, acabarás con líneas cortadas, logs mezclados y archivos corruptos.
El desastre del logging con threads:
import threading
import logging
import time
import random
# MAL - Logger compartido sin protección
logging.basicConfig(
level=logging.INFO,
format='%(threadName)s - %(message)s',
filename='app.log'
)
def trabajador_con_logs(worker_id):
"""Cada thread escribe logs"""
for i in range(10):
# Simular trabajo
time.sleep(random.uniform(0.01, 0.1))
# PROBLEMA: Los logs se mezclan
logging.info(f"Worker {worker_id} - Paso {i}")
# Logs multilínea = CAOS TOTAL
logging.info(f"""Worker {worker_id} procesando:
- Item: {i}
- Estado: Activo
- Progreso: {i*10}%""")
# Crear threads que escriben logs
threads = []
for i in range(5):
t = threading.Thread(target=trabajador_con_logs, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# El archivo de log será ilegible
Resultado en el log:
Thread-1 - Worker 1 - PaThread-2 - Worker 2 - Paso 0
so 0
Thread-1 - Worker 1 procesando:
Thread-3 - Worker 3 - Paso 0
- Item: 0
- Estado: Activo
Thread-2 - Worker 2 procesando:
- Progreso: 0%
La solución con QueueHandler:
import logging
import logging.handlers
import queue
import threading
import atexit
class LoggerSeguro:
"""Sistema de logging thread-safe con cola"""
def __init__(self, nombre_archivo='app.log'):
# Cola para logs
self.cola_logs = queue.Queue()
# Handler que escribe desde un solo thread
self.queue_handler = logging.handlers.QueueHandler(self.cola_logs)
# Listener que procesa la cola
file_handler = logging.FileHandler(nombre_archivo)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
)
self.listener = logging.handlers.QueueListener(
self.cola_logs,
file_handler
)
# Configurar logger root
self.logger = logging.getLogger()
self.logger.addHandler(self.queue_handler)
self.logger.setLevel(logging.INFO)
# Iniciar listener
self.listener.start()
# Asegurar limpieza al salir
atexit.register(self.listener.stop)
def get_logger(self, nombre=None):
"""Obtener logger para un módulo"""
return logging.getLogger(nombre)
# Sistema de logging estructurado
import json
from datetime import datetime
class LoggerEstructurado:
"""Logger que produce JSON estructurado"""
def __init__(self, nombre_archivo='app.jsonl'):
self.lock = threading.Lock()
self.archivo = nombre_archivo
self.buffer = []
self.buffer_size = 100 # Escribir cada 100 logs
def log(self, nivel, mensaje, **campos_extra):
"""Log thread-safe con datos estructurados"""
entrada = {
'timestamp': datetime.utcnow().isoformat(),
'nivel': nivel,
'thread': threading.current_thread().name,
'mensaje': mensaje,
**campos_extra
}
with self.lock:
self.buffer.append(entrada)
if len(self.buffer) >= self.buffer_size:
self._flush()
def _flush(self):
"""Escribir buffer a disco (debe llamarse con lock)"""
if not self.buffer:
return
with open(self.archivo, 'a') as f:
for entrada in self.buffer:
f.write(json.dumps(entrada) + '\n')
self.buffer.clear()
def info(self, mensaje, **kwargs):
self.log('INFO', mensaje, **kwargs)
def error(self, mensaje, **kwargs):
self.log('ERROR', mensaje, **kwargs)
def __del__(self):
"""Asegurar que se escriben todos los logs"""
with self.lock:
self._flush()
# Uso correcto
logger_seguro = LoggerEstructurado()
def trabajador_mejorado(worker_id):
"""Worker con logging estructurado"""
for i in range(10):
# Log con contexto
logger_seguro.info(
f"Procesando item",
worker_id=worker_id,
item=i,
progreso=i*10
)
try:
# Simular trabajo
resultado = procesar_item(i)
logger_seguro.info(
"Item procesado",
worker_id=worker_id,
item=i,
resultado=resultado
)
except Exception as e:
logger_seguro.error(
"Error procesando item",
worker_id=worker_id,
item=i,
error=str(e),
traceback=traceback.format_exc()
)
def procesar_item(item):
time.sleep(0.1)
return item * 2
# Alternativa: Logging centralizado con asyncio
import asyncio
import aiofiles
class LoggerAsync:
"""Logger asíncrono sin bloqueos"""
def __init__(self, archivo='app_async.log'):
self.archivo = archivo
self.cola = asyncio.Queue()
self.task = None
async def iniciar(self):
"""Inicia el procesador de logs"""
self.task = asyncio.create_task(self._procesador())
async def _procesador(self):
"""Procesa logs de la cola"""
async with aiofiles.open(self.archivo, 'a') as f:
while True:
try:
# Esperar logs con timeout
entrada = await asyncio.wait_for(
self.cola.get(),
timeout=1.0
)
# Escribir de forma asíncrona
await f.write(entrada + '\n')
await f.flush()
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
# Procesar logs restantes antes de cerrar
while not self.cola.empty():
entrada = await self.cola.get()
await f.write(entrada + '\n')
break
async def log(self, nivel, mensaje):
"""Añadir log a la cola"""
timestamp = datetime.utcnow().isoformat()
entrada = f"[{timestamp}] {nivel}: {mensaje}"
await self.cola.put(entrada)
async def cerrar(self):
"""Cerrar logger de forma limpia"""
if self.task:
self.task.cancel()
await self.task
5. Cachear resultados compartidos
La pesadilla de la cache con threads
Implementar una cache thread-safe es más difícil de lo que piensas. El 99% de las implementaciones tienen race conditions.
La cache rota que todos hemos hecho:
import threading
import time
import random
class CacheRota:
"""Cache con race conditions"""
def __init__(self):
self.cache = {}
self.lock = threading.Lock()
def obtener_o_calcular(self, clave, funcion_costosa):
# PROBLEMA 1: Check-then-act race condition
if clave not in self.cache:
# Entre estas dos líneas, otro thread puede añadir la clave
with self.lock:
# PROBLEMA 2: Doble cálculo
# Múltiples threads pueden llegar aquí
resultado = funcion_costosa()
self.cache[clave] = resultado
return resultado
return self.cache[clave]
def operacion_costosa():
"""Simula operación costosa"""
print(f"Thread {threading.current_thread().name} calculando...")
time.sleep(1)
return random.randint(1, 100)
# Múltiples threads = múltiples cálculos para la misma clave
cache = CacheRota()
threads = []
for i in range(5):
t = threading.Thread(
target=lambda: print(cache.obtener_o_calcular("key1", operacion_costosa))
)
threads.append(t)
t.start()
# Verás "calculando..." múltiples veces para la misma clave!
La solución correcta con functools:
from functools import lru_cache, wraps
import threading
import time
# Solución 1: LRU Cache thread-safe incorporada
@lru_cache(maxsize=128)
def operacion_con_cache(parametro):
"""LRU cache es thread-safe por defecto"""
print(f"Calculando para {parametro}...")
time.sleep(1)
return parametro ** 2
# Solución 2: Cache personalizada thread-safe
class CacheInteligente:
"""Cache con lock por clave y lazy loading"""
def __init__(self, ttl=60):
self.cache = {}
self.locks = {}
self.lock_principal = threading.Lock()
self.ttl = ttl
def obtener_o_calcular(self, clave, funcion):
"""Obtiene de cache o calcula, sin doble cálculo"""
# Fast path - si ya está en cache
if clave in self.cache:
valor, timestamp = self.cache[clave]
if time.time() - timestamp < self.ttl:
return valor
# Obtener lock específico para esta clave
with self.lock_principal:
if clave not in self.locks:
self.locks[clave] = threading.Lock()
lock_clave = self.locks[clave]
# Ahora solo un thread calculará para esta clave
with lock_clave:
# Double-check pattern
if clave in self.cache:
valor, timestamp = self.cache[clave]
if time.time() - timestamp < self.ttl:
return valor
# Calcular solo una vez
print(f"Calculando {clave} en {threading.current_thread().name}")
valor = funcion()
self.cache[clave] = (valor, time.time())
return valor
def limpiar_expirados(self):
"""Limpia entradas expiradas"""
with self.lock_principal:
ahora = time.time()
claves_expiradas = [
k for k, (_, timestamp) in self.cache.items()
if ahora - timestamp >= self.ttl
]
for k in claves_expiradas:
del self.cache[k]
if k in self.locks:
del self.locks[k]
# Solución 3: Cache con asyncio (la mejor opción)
import asyncio
from typing import Dict, Any, Callable, Optional
class CacheAsync:
"""Cache asíncrona sin bloqueos"""
def __init__(self, ttl: int = 60):
self.cache: Dict[str, tuple[Any, float]] = {}
self.calculando: Dict[str, asyncio.Future] = {}
self.ttl = ttl
async def obtener_o_calcular(
self,
clave: str,
funcion: Callable,
*args,
**kwargs
):
"""Obtiene de cache o calcula de forma asíncrona"""
# Verificar cache
if clave in self.cache:
valor, timestamp = self.cache[clave]
if time.time() - timestamp < self.ttl:
return valor
# Verificar si ya se está calculando
if clave in self.calculando:
# Esperar resultado del cálculo en progreso
return await self.calculando[clave]
# Crear future para que otros threads esperen
future = asyncio.Future()
self.calculando[clave] = future
try:
# Calcular valor
if asyncio.iscoroutinefunction(funcion):
valor = await funcion(*args, **kwargs)
else:
valor = await asyncio.to_thread(funcion, *args, **kwargs)
# Guardar en cache
self.cache[clave] = (valor, time.time())
# Resolver future para threads esperando
future.set_result(valor)
return valor
except Exception as e:
future.set_exception(e)
raise
finally:
# Limpiar future
del self.calculando[clave]
# Decorator para cachear cualquier función
def cache_decorator(ttl=60):
"""Decorator que añade cache thread-safe a cualquier función"""
cache = {}
locks = {}
lock_principal = threading.Lock()
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Crear clave única basada en argumentos
clave = f"{func.__name__}_{args}_{kwargs}"
# Check rápido sin lock
if clave in cache:
valor, timestamp = cache[clave]
if time.time() - timestamp < ttl:
return valor
# Obtener o crear lock para esta clave
with lock_principal:
if clave not in locks:
locks[clave] = threading.Lock()
lock = locks[clave]
# Calcular con lock
with lock:
# Double-check
if clave in cache:
valor, timestamp = cache[clave]
if time.time() - timestamp < ttl:
return valor
# Calcular
valor = func(*args, **kwargs)
cache[clave] = (valor, time.time())
return valor
return wrapper
return decorator
# Uso
@cache_decorator(ttl=30)
def operacion_costosa(x, y):
print(f"Calculando {x} + {y}")
time.sleep(1)
return x + y
# Test con múltiples threads
def test_cache():
for _ in range(3):
resultado = operacion_costosa(5, 3)
print(f"Resultado: {resultado}")
threads = []
for i in range(10):
t = threading.Thread(target=test_cache)
threads.append(t)
t.start()
# Solo verás "Calculando 5 + 3" una vez!
6. Ejecutar comandos del sistema operativo
Cuando subprocess + threads = desastre
Mezclar subprocess con threads es una receta para deadlocks y procesos zombie.
El problema con subprocess y threads:
import subprocess
import threading
import time
def ejecutar_comando_mal(comando, resultados, indice):
"""Ejecuta comando en thread - PROBLEMÁTICO"""
try:
# PROBLEMA: subprocess no es thread-safe en todas las plataformas
resultado = subprocess.run(
comando,
shell=True,
capture_output=True,
text=True,
timeout=5
)
resultados[indice] = resultado.stdout
except subprocess.TimeoutExpired:
resultados[indice] = "Timeout"
except Exception as e:
resultados[indice] = f"Error: {e}"
# MAL - Múltiples threads ejecutando subprocesos
comandos = [
"sleep 1 && echo 'Comando 1'",
"sleep 2 && echo 'Comando 2'",
"sleep 1 && echo 'Comando 3'",
]
resultados = [None] * len(comandos)
threads = []
for i, cmd in enumerate(comandos):
t = threading.Thread(target=ejecutar_comando_mal, args=(cmd, resultados, i))
threads.append(t)
t.start()
# Problemas:
# 1. File descriptor leaks
# 2. Procesos zombie
# 3. Señales perdidas
La solución correcta con asyncio:
import asyncio
import sys
async def ejecutar_comando_async(comando):
"""Ejecuta comando de forma asíncrona y segura"""
# Crear proceso asíncrono
proceso = await asyncio.create_subprocess_shell(
comando,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Esperar resultado con timeout
try:
stdout, stderr = await asyncio.wait_for(
proceso.communicate(),
timeout=5.0
)
return {
'comando': comando,
'codigo': proceso.returncode,
'stdout': stdout.decode(),
'stderr': stderr.decode()
}
except asyncio.TimeoutError:
# Matar proceso que no responde
proceso.kill()
await proceso.wait()
return {
'comando': comando,
'error': 'Timeout después de 5 segundos'
}
async def ejecutar_comandos_paralelo(comandos):
"""Ejecuta múltiples comandos en paralelo"""
# Limitar concurrencia para no sobrecargar el sistema
semaforo = asyncio.Semaphore(5)
async def ejecutar_con_limite(cmd):
async with semaforo:
return await ejecutar_comando_async(cmd)
# Ejecutar todos en paralelo
resultados = await asyncio.gather(
*[ejecutar_con_limite(cmd) for cmd in comandos],
return_exceptions=True
)
return resultados
# Uso
comandos = [
"echo 'Hola 1' && sleep 1",
"echo 'Hola 2' && sleep 2",
"ls -la",
"pwd",
"date"
]
resultados = asyncio.run(ejecutar_comandos_paralelo(comandos))
for resultado in resultados:
if isinstance(resultado, dict):
print(f"Comando: {resultado.get('comando', 'unknown')}")
print(f"Salida: {resultado.get('stdout', resultado.get('error', ''))}")
# Solución alternativa con concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import subprocess
class EjecutorComandos:
"""Ejecutor de comandos con pool de workers"""
def __init__(self, max_workers=4):
# Usar ProcessPoolExecutor para aislamiento total
self.executor = ProcessPoolExecutor(max_workers=max_workers)
@staticmethod
def _ejecutar_comando(comando, timeout=5):
"""Función estática para ejecutar en proceso separado"""
try:
resultado = subprocess.run(
comando,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
return {
'comando': comando,
'exitcode': resultado.returncode,
'stdout': resultado.stdout,
'stderr': resultado.stderr,
'success': resultado.returncode == 0
}
except subprocess.TimeoutExpired:
return {
'comando': comando,
'error': f'Timeout después de {timeout} segundos',
'success': False
}
except Exception as e:
return {
'comando': comando,
'error': str(e),
'success': False
}
def ejecutar_paralelo(self, comandos, timeout=5):
"""Ejecuta comandos en paralelo de forma segura"""
# Submit todas las tareas
futuros = [
self.executor.submit(self._ejecutar_comando, cmd, timeout)
for cmd in comandos
]
# Esperar resultados
resultados = []
for futuro in futuros:
try:
resultado = futuro.result(timeout=timeout + 1)
resultados.append(resultado)
except Exception as e:
resultados.append({
'error': f'Error ejecutando: {e}',
'success': False
})
return resultados
def cerrar(self):
"""Cierra el executor de forma limpia"""
self.executor.shutdown(wait=True)
# Uso seguro
executor = EjecutorComandos(max_workers=4)
comandos = [
"echo 'Proceso 1'",
"python -c 'print(2+2)'",
"ls -la | head -5",
"sleep 10" # Este tendrá timeout
]
resultados = executor.ejecutar_paralelo(comandos, timeout=3)
for r in resultados:
if r['success']:
print(f"✓ {r['comando']}: {r['stdout'].strip()}")
else:
print(f"✗ {r['comando']}: {r.get('error', 'Error desconocido')}")
executor.cerrar()
# Gestión avanzada con pipes y streams
async def ejecutar_con_streaming(comando):
"""Ejecuta comando y procesa salida en tiempo real"""
proceso = await asyncio.create_subprocess_shell(
comando,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT
)
# Leer salida línea por línea
async for linea in proceso.stdout:
print(f"[{comando[:20]}...]: {linea.decode().strip()}")
await proceso.wait()
return proceso.returncode
# Ejecutar comando largo con salida en tiempo real
asyncio.run(ejecutar_con_streaming("for i in {1..5}; do echo $i; sleep 1; done"))
7. Conexiones a base de datos
El infierno de los connection pools con threads
Compartir conexiones de base de datos entre threads es la causa #1 de corrupcción de datos y deadlocks.
El código que destruye tu base de datos:
import threading
import sqlite3
import time
import random
# MAL - Conexión compartida entre threads
conexion = sqlite3.connect('database.db', check_same_thread=False)
cursor = conexion.cursor()
# Crear tabla
cursor.execute('''
CREATE TABLE IF NOT EXISTS cuentas (
id INTEGER PRIMARY KEY,
saldo REAL
)
''')
cursor.execute("INSERT OR IGNORE INTO cuentas VALUES (1, 1000)")
conexion.commit()
def transferir_dinero_mal(de_cuenta, a_cuenta, cantidad):
"""Transferencia con race conditions"""
# PROBLEMA 1: Lecturas inconsistentes
cursor.execute("SELECT saldo FROM cuentas WHERE id = ?", (de_cuenta,))
saldo_origen = cursor.fetchone()[0]
# PROBLEMA 2: Entre estas líneas otro thread puede modificar
time.sleep(random.uniform(0.001, 0.01)) # Simular latencia
if saldo_origen >= cantidad:
# PROBLEMA 3: Transacciones no atómicas
cursor.execute(
"UPDATE cuentas SET saldo = saldo - ? WHERE id = ?",
(cantidad, de_cuenta)
)
cursor.execute(
"UPDATE cuentas SET saldo = saldo + ? WHERE id = ?",
(cantidad, a_cuenta)
)
conexion.commit()
print(f"Transferido {cantidad} de {de_cuenta} a {a_cuenta}")
else:
print(f"Saldo insuficiente")
# Crear más cuentas
for i in range(2, 5):
cursor.execute("INSERT OR IGNORE INTO cuentas VALUES (?, 1000)", (i,))
conexion.commit()
# Múltiples threads = CAOS
threads = []
for _ in range(10):
cuenta_origen = random.randint(1, 4)
cuenta_destino = random.randint(1, 4)
while cuenta_destino == cuenta_origen:
cuenta_destino = random.randint(1, 4)
cantidad = random.randint(10, 100)
t = threading.Thread(
target=transferir_dinero_mal,
args=(cuenta_origen, cuenta_destino, cantidad)
)
threads.append(t)
t.start()
for t in threads:
t.join()
# Verificar integridad - el dinero total cambió!
cursor.execute("SELECT SUM(saldo) FROM cuentas")
print(f"Saldo total: {cursor.fetchone()[0]} (debería ser 4000)")
La solución correcta con connection pool:
import sqlite3
from contextlib import contextmanager
import threading
import queue
import time
class PoolConexionesSeguro:
"""Pool de conexiones thread-safe"""
def __init__(self, database, max_conexiones=5):
self.database = database
self.pool = queue.Queue(maxsize=max_conexiones)
# Crear conexiones iniciales
for _ in range(max_conexiones):
conn = self._crear_conexion()
self.pool.put(conn)
def _crear_conexion(self):
"""Crea una conexión configurada correctamente"""
conn = sqlite3.connect(self.database)
conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging
conn.execute("PRAGMA busy_timeout=5000") # 5 segundos timeout
conn.row_factory = sqlite3.Row # Resultados como diccionarios
return conn
@contextmanager
def obtener_conexion(self):
"""Context manager para obtener conexión del pool"""
conexion = self.pool.get()
try:
yield conexion
finally:
# Rollback automático si hay transacción activa
conexion.rollback()
self.pool.put(conexion)
@contextmanager
def transaccion(self):
"""Context manager para transacciones automáticas"""
with self.obtener_conexion() as conn:
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
# Operaciones seguras con el pool
pool = PoolConexionesSeguro('database_segura.db', max_conexiones=3)
def inicializar_db():
with pool.transaccion() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS cuentas (
id INTEGER PRIMARY KEY,
saldo REAL NOT NULL,
version INTEGER DEFAULT 0
)
''')
# Crear cuentas iniciales
for i in range(1, 5):
conn.execute(
"INSERT OR IGNORE INTO cuentas (id, saldo) VALUES (?, ?)",
(i, 1000.0)
)
def transferir_dinero_seguro(de_cuenta, a_cuenta, cantidad):
"""Transferencia con control de concurrencia optimista"""
with pool.transaccion() as conn:
# Bloqueo a nivel de fila con SELECT FOR UPDATE simulado
cursor = conn.execute(
"SELECT saldo, version FROM cuentas WHERE id = ?",
(de_cuenta,)
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Cuenta {de_cuenta} no existe")
saldo_actual = row[0]
version_actual = row[1]
if saldo_actual < cantidad:
raise ValueError("Saldo insuficiente")
# Actualizar con verificación de versión
result = conn.execute("""
UPDATE cuentas
SET saldo = saldo - ?, version = version + 1
WHERE id = ? AND version = ?
""", (cantidad, de_cuenta, version_actual))
if result.rowcount == 0:
raise ValueError("Conflicto de concurrencia - reintentar")
# Actualizar cuenta destino
conn.execute(
"UPDATE cuentas SET saldo = saldo + ? WHERE id = ?",
(cantidad, a_cuenta)
)
print(f"✓ Transferido {cantidad} de cuenta {de_cuenta} a {a_cuenta}")
# Versión con SQLAlchemy (mejor para producción)
from sqlalchemy import create_engine, Column, Integer, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool
Base = declarative_base()
class Cuenta(Base):
__tablename__ = 'cuentas'
id = Column(Integer, primary_key=True)
saldo = Column(Float, nullable=False)
version = Column(Integer, default=0)
# Configurar engine con pool
engine = create_engine(
'sqlite:///database_sqlalchemy.db',
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Verificar conexiones antes de usar
poolclass=QueuePool
)
# Session factory thread-safe
SessionFactory = scoped_session(sessionmaker(bind=engine))
def transferir_con_sqlalchemy(de_cuenta, a_cuenta, cantidad):
"""Transferencia usando SQLAlchemy ORM"""
session = SessionFactory()
try:
# Bloqueo pesimista
cuenta_origen = session.query(Cuenta).filter_by(
id=de_cuenta
).with_for_update().first()
cuenta_destino = session.query(Cuenta).filter_by(
id=a_cuenta
).with_for_update().first()
if cuenta_origen.saldo >= cantidad:
cuenta_origen.saldo -= cantidad
cuenta_destino.saldo += cantidad
session.commit()
print(f"✓ SQLAlchemy: Transferido {cantidad}")
else:
session.rollback()
print("✗ Saldo insuficiente")
except Exception as e:
session.rollback()
print(f"Error: {e}")
finally:
session.close()
# Async con asyncpg (PostgreSQL)
import asyncio
import asyncpg
class PoolAsync:
"""Pool de conexiones asíncronas"""
def __init__(self):
self.pool = None
async def inicializar(self):
self.pool = await asyncpg.create_pool(
'postgresql://usuario:password@localhost/db',
min_size=2,
max_size=10
)
async def transferir_async(self, de_cuenta, a_cuenta, cantidad):
async with self.pool.acquire() as conn:
async with conn.transaction():
# Todo en una transacción
saldo = await conn.fetchval(
"SELECT saldo FROM cuentas WHERE id = $1 FOR UPDATE",
de_cuenta
)
if saldo >= cantidad:
await conn.execute(
"UPDATE cuentas SET saldo = saldo - $1 WHERE id = $2",
cantidad, de_cuenta
)
await conn.execute(
"UPDATE cuentas SET saldo = saldo + $1 WHERE id = $2",
cantidad, a_cuenta
)
return True
return False
# Test de integridad
inicializar_db()
threads = []
for _ in range(20):
t = threading.Thread(
target=transferir_dinero_seguro,
args=(
random.randint(1, 4),
random.randint(1, 4),
random.randint(10, 50)
)
)
threads.append(t)
t.start()
for t in threads:
t.join()
# Verificar integridad
with pool.obtener_conexion() as conn:
cursor = conn.execute("SELECT SUM(saldo) FROM cuentas")
total = cursor.fetchone()[0]
print(f"Saldo total final: {total} (debe ser 4000)")
Conclusión: Las reglas que debes tatuar en tu mente
Si solo recuerdas 7 cosas, que sean estas:
- Para peticiones HTTP: usa
asyncio+aiohttp, no threads - Para archivos: usa generadores o
asyncio, no threads - Para GUIs: usa colas o señales, nunca actualices desde threads
- Para logs: usa
QueueHandlero logging estructurado - Para cache: usa
lru_cacheo implementaciones lock-free - Para comandos: usa
asyncio.subprocessoProcessPoolExecutor - Para bases de datos: usa connection pools o sesiones thread-local
La regla de oro
Antes de crear un thread, hazte estas 3 preguntas:
- ¿Puedo hacerlo con
asyncio? (90% de las veces, sí) - ¿Necesito paralelismo real? (usa
multiprocessing) - ¿Es I/O bloqueante sin alternativa async? (OK, usa threads)
Recursos para no cagarla
- asyncio: https://docs.python.org/3/library/asyncio.html
- multiprocessing: https://docs.python.org/3/library/multiprocessing.html
- concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html
- threading: https://docs.python.org/3/library/threading.html (para los raros casos donde sí los necesitas)
Recuerda: el mejor código con threads es el que no tiene threads.
Ahora ve y refactoriza ese código lleno de threads que tienes en producción. Tu yo del futuro te lo agradecerá cuando no tenga que debuggear race conditions a las 3 AM.
¿Cómo has llegado hasta aquí?
Este artículo se envió a la lista de email como respuesta a una pregunta frecuente.
Puedes apuntarte a la lista de email aquí: vamosallio.com
