El engaño de los threads que te está costando rendimiento

Voy directo al grano.

Si llevas más de 6 meses programando en Python y sigues usando threads para “hacer tu código más rápido”, este documento te va a doler. Pero es necesario.

Los threads en Python no funcionan como en Java o C++. Y si no entiendes por qué, estás escribiendo código lento creyendo que es rápido.

Vamos a destrozar los 7 errores más comunes que veo cada día en código de producción.


1. Hacer peticiones HTTP en paralelo

El error de los 100 threads para 100 peticiones

Todos hemos hecho esto. Necesitas llamar a 100 endpoints de una API y piensas: “voy a crear 100 threads”. Acabas de desperdiciar 1GB de RAM para nada.

Lo que estás haciendo mal:

import threading
import requests
import time

def obtener_datos_api(url, resultados, indice):
    """Cada thread hace una petición"""
    response = requests.get(url)
    resultados[indice] = response.json()

# MAL - Un thread por petición
urls = [f"https://api.ejemplo.com/usuario/{i}" for i in range(100)]
resultados = [None] * 100
threads = []

inicio = time.time()

for i, url in enumerate(urls):
    t = threading.Thread(target=obtener_datos_api, args=(url, resultados, i))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Tiempo con threads: {time.time() - inicio:.2f}s")
print(f"Memoria desperdiciada: ~{len(threads) * 8}MB")

Por qué es un desastre:

  • Cada thread consume 8-16MB solo por existir
  • El GIL impide que se ejecuten en paralelo real
  • La sincronización añade overhead innecesario
  • requests.get() ya es I/O no bloqueante internamente

La solución que deberías usar:

import asyncio
import aiohttp
import time

async def obtener_datos_async(session, url):
    """Una corutina ligera por petición"""
    async with session.get(url) as response:
        return await response.json()

async def obtener_todos_los_datos():
    urls = [f"https://api.ejemplo.com/usuario/{i}" for i in range(100)]

    # Un solo objeto session reutilizable
    async with aiohttp.ClientSession() as session:
        # Crear todas las tareas
        tareas = [obtener_datos_async(session, url) for url in urls]

        # Ejecutar todas en paralelo
        return await asyncio.gather(*tareas)

# BIEN - Sin threads, 100x menos memoria
inicio = time.time()
resultados = asyncio.run(obtener_todos_los_datos())
print(f"Tiempo con asyncio: {time.time() - inicio:.2f}s")
print(f"Memoria usada: ~2MB")  # En lugar de 800MB

# Bonus: Control de concurrencia
async def obtener_con_limite():
    """Limita a 10 conexiones simultáneas"""
    urls = [f"https://api.ejemplo.com/usuario/{i}" for i in range(100)]

    # Semáforo para limitar concurrencia
    limite = asyncio.Semaphore(10)

    async def fetch_limitado(session, url):
        async with limite:  # Máximo 10 a la vez
            return await obtener_datos_async(session, url)

    async with aiohttp.ClientSession() as session:
        tareas = [fetch_limitado(session, url) for url in urls]
        return await asyncio.gather(*tareas)

Diferencia real en producción:

  • Threads: 100 peticiones = 800MB RAM, 5 segundos
  • Asyncio: 100 peticiones = 2MB RAM, 2 segundos

2. Procesar archivos línea por línea

Cuando los threads matan el rendimiento de I/O

Crear un thread por archivo o por chunk de archivo es el error más caro que puedes cometer.

El código que todos hemos escrito:

import threading
import time

def procesar_archivo_con_thread(nombre_archivo, resultados):
    """Procesa un archivo en un thread separado"""
    contador_lineas = 0
    suma_valores = 0

    with open(nombre_archivo, 'r') as f:
        for linea in f:
            # Simular procesamiento
            valores = linea.strip().split(',')
            if valores:
                contador_lineas += 1
                suma_valores += len(valores)

    resultados[nombre_archivo] = {
        'lineas': contador_lineas,
        'suma': suma_valores
    }

# MAL - Un thread por archivo
archivos = [f"datos_{i}.csv" for i in range(10)]
resultados = {}
threads = []

for archivo in archivos:
    t = threading.Thread(target=procesar_archivo_con_thread, args=(archivo, resultados))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Por qué falla:

  • El GIL serializa todo el procesamiento Python
  • Los threads compiten por acceso al disco
  • El cambio de contexto añade latencia
  • No aprovechas el buffer del sistema operativo

La forma correcta con generadores:

import asyncio
import aiofiles
from pathlib import Path

async def procesar_archivo_async(nombre_archivo):
    """Procesa archivo sin bloquear"""
    contador_lineas = 0
    suma_valores = 0

    async with aiofiles.open(nombre_archivo, 'r') as f:
        async for linea in f:
            # Procesamiento sin bloquear el event loop
            valores = linea.strip().split(',')
            if valores:
                contador_lineas += 1
                suma_valores += len(valores)

            # Dar control al event loop cada 1000 líneas
            if contador_lineas % 1000 == 0:
                await asyncio.sleep(0)

    return {
        'archivo': nombre_archivo,
        'lineas': contador_lineas,
        'suma': suma_valores
    }

async def procesar_todos_los_archivos():
    archivos = [f"datos_{i}.csv" for i in range(10)]

    # Procesar todos en paralelo
    tareas = [procesar_archivo_async(archivo) for archivo in archivos]
    return await asyncio.gather(*tareas)

# Ejecutar
resultados = asyncio.run(procesar_todos_los_archivos())

# Alternativa: Generador eficiente para archivos grandes
def procesar_archivos_eficiente(archivos):
    """Usa generadores para memoria constante"""

    def procesar_linea(linea):
        # Tu lógica aquí
        return len(linea.strip().split(','))

    for archivo in archivos:
        # Generador que no carga todo en memoria
        with open(archivo, 'r') as f:
            # Procesar en chunks sin threads
            resultado = sum(
                procesar_linea(linea)
                for linea in f
                if linea.strip()
            )

            yield archivo, resultado

# Uso eficiente sin threads ni async
archivos = [f"datos_{i}.csv" for i in range(10)]
for archivo, resultado in procesar_archivos_eficiente(archivos):
    print(f"{archivo}: {resultado}")

La solución óptima para archivos enormes:

from multiprocessing import Pool
import mmap
import os

def procesar_chunk_mmap(args):
    """Procesa un chunk de archivo con memory mapping"""
    nombre_archivo, inicio, fin = args

    with open(nombre_archivo, 'r+b') as f:
        # Memory mapping - no carga el archivo en RAM
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped:
            # Procesar solo el chunk asignado
            chunk = mmapped[inicio:fin]

            # Tu procesamiento aquí
            lineas = chunk.count(b'\n')
            return lineas

def procesar_archivo_grande_paralelo(nombre_archivo, num_procesos=4):
    """Divide archivo grande en chunks para procesamiento paralelo"""

    # Obtener tamaño del archivo
    tamano = os.path.getsize(nombre_archivo)
    chunk_size = tamano // num_procesos

    # Crear argumentos para cada proceso
    chunks = []
    for i in range(num_procesos):
        inicio = i * chunk_size
        fin = tamano if i == num_procesos - 1 else (i + 1) * chunk_size
        chunks.append((nombre_archivo, inicio, fin))

    # Procesar en paralelo REAL
    with Pool(num_procesos) as pool:
        resultados = pool.map(procesar_chunk_mmap, chunks)

    return sum(resultados)

# Para archivo de 1GB - 4x más rápido que threads
total_lineas = procesar_archivo_grande_paralelo("archivo_enorme.txt")

3. Actualizar una GUI (Tkinter, PyQt, etc.)

El error clásico que congela tu interfaz

Si usas threads para actualizar la GUI, estás creando race conditions y crashes aleatorios.

Lo que NO debes hacer:

import tkinter as tk
import threading
import time
import random

class AppConThreads:
    def __init__(self, root):
        self.root = root
        self.label = tk.Label(root, text="Contador: 0")
        self.label.pack()

        self.boton = tk.Button(root, text="Iniciar", command=self.iniciar_contador)
        self.boton.pack()

        self.contador = 0

    def actualizar_gui_desde_thread(self):
        """PELIGRO - Actualizar GUI desde thread"""
        for i in range(100):
            time.sleep(0.1)
            self.contador = i

            # ESTO CAUSARÁ CRASHES ALEATORIOS
            self.label.config(text=f"Contador: {self.contador}")

            # A veces funciona, a veces crash, siempre impredecible

    def iniciar_contador(self):
        # MAL - Thread actualizando GUI directamente
        t = threading.Thread(target=self.actualizar_gui_desde_thread)
        t.daemon = True
        t.start()

# Esto crasheará aleatoriamente
root = tk.Tk()
app = AppConThreads(root)
root.mainloop()

Por qué explota:

  • Las GUIs NO son thread-safe
  • Tkinter usa Tcl/Tk que tiene su propio thread
  • Las actualizaciones desde threads causan corrupción de memoria
  • Los crashes son impredecibles y difíciles de debuggear

La solución correcta con colas:

import tkinter as tk
import queue
import time
from threading import Thread

class AppSegura:
    def __init__(self, root):
        self.root = root
        self.label = tk.Label(root, text="Contador: 0")
        self.label.pack()

        self.progress = tk.Label(root, text="Progreso: 0%")
        self.progress.pack()

        self.boton = tk.Button(root, text="Iniciar", command=self.iniciar_proceso)
        self.boton.pack()

        # Cola thread-safe para comunicación
        self.cola_updates = queue.Queue()

        # Iniciar loop de actualización
        self.procesar_cola()

    def trabajo_en_background(self):
        """Trabajo pesado en thread separado"""
        for i in range(101):
            time.sleep(0.05)  # Simular trabajo

            # Enviar actualización a través de la cola
            self.cola_updates.put({
                'tipo': 'contador',
                'valor': i
            })

            if i % 10 == 0:
                self.cola_updates.put({
                    'tipo': 'progreso',
                    'valor': i
                })

    def procesar_cola(self):
        """Procesa actualizaciones en el thread principal"""
        try:
            while True:
                # No bloquear, procesar todo lo disponible
                update = self.cola_updates.get_nowait()

                if update['tipo'] == 'contador':
                    self.label.config(text=f"Contador: {update['valor']}")
                elif update['tipo'] == 'progreso':
                    self.progress.config(text=f"Progreso: {update['valor']}%")

        except queue.Empty:
            pass

        # Programar siguiente check
        self.root.after(50, self.procesar_cola)

    def iniciar_proceso(self):
        """Inicia trabajo en background de forma segura"""
        thread = Thread(target=self.trabajo_en_background)
        thread.daemon = True
        thread.start()

# Versión moderna con asyncio y tkinter
import asyncio

class AppAsync:
    def __init__(self, root):
        self.root = root
        self.label = tk.Label(root, text="Estado: Esperando")
        self.label.pack()

        self.boton = tk.Button(
            root,
            text="Iniciar Async",
            command=lambda: asyncio.create_task(self.proceso_async())
        )
        self.boton.pack()

    async def proceso_async(self):
        """Proceso asíncrono que no bloquea la GUI"""
        for i in range(10):
            self.label.config(text=f"Procesando: {i+1}/10")

            # Dar control al event loop
            await asyncio.sleep(0.5)

            # Actualizar GUI de forma segura
            self.root.update_idletasks()

        self.label.config(text="Completado!")

# Para PyQt/PySide - Usar QThread correctamente
from PyQt5.QtCore import QThread, pyqtSignal, QObject
from PyQt5.QtWidgets import QApplication, QLabel, QPushButton, QVBoxLayout, QWidget

class TrabajadorSeguro(QObject):
    """Worker que emite señales en lugar de actualizar GUI"""
    progreso = pyqtSignal(int)
    completado = pyqtSignal(str)

    def ejecutar(self):
        for i in range(101):
            time.sleep(0.05)
            # Emitir señal - thread-safe
            self.progreso.emit(i)

        self.completado.emit("Proceso terminado")

class VentanaSegura(QWidget):
    def __init__(self):
        super().__init__()
        layout = QVBoxLayout()

        self.label = QLabel("0%")
        layout.addWidget(self.label)

        self.boton = QPushButton("Iniciar")
        self.boton.clicked.connect(self.iniciar_trabajo)
        layout.addWidget(self.boton)

        self.setLayout(layout)

        # Configurar thread y worker
        self.thread = QThread()
        self.worker = TrabajadorSeguro()
        self.worker.moveToThread(self.thread)

        # Conectar señales
        self.thread.started.connect(self.worker.ejecutar)
        self.worker.progreso.connect(self.actualizar_progreso)
        self.worker.completado.connect(self.trabajo_completado)

    def iniciar_trabajo(self):
        self.thread.start()

    def actualizar_progreso(self, valor):
        """Actualización thread-safe desde señal"""
        self.label.setText(f"{valor}%")

    def trabajo_completado(self, mensaje):
        self.label.setText(mensaje)
        self.thread.quit()

4. Escribir logs desde múltiples partes de tu aplicación

El caos de los logs mezclados

Si cada thread escribe logs directamente, acabarás con líneas cortadas, logs mezclados y archivos corruptos.

El desastre del logging con threads:

import threading
import logging
import time
import random

# MAL - Logger compartido sin protección
logging.basicConfig(
    level=logging.INFO,
    format='%(threadName)s - %(message)s',
    filename='app.log'
)

def trabajador_con_logs(worker_id):
    """Cada thread escribe logs"""
    for i in range(10):
        # Simular trabajo
        time.sleep(random.uniform(0.01, 0.1))

        # PROBLEMA: Los logs se mezclan
        logging.info(f"Worker {worker_id} - Paso {i}")

        # Logs multilínea = CAOS TOTAL
        logging.info(f"""Worker {worker_id} procesando:
        - Item: {i}
        - Estado: Activo
        - Progreso: {i*10}%""")

# Crear threads que escriben logs
threads = []
for i in range(5):
    t = threading.Thread(target=trabajador_con_logs, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# El archivo de log será ilegible

Resultado en el log:

Thread-1 - Worker 1 - PaThread-2 - Worker 2 - Paso 0
so 0
Thread-1 - Worker 1 procesando:
Thread-3 - Worker 3 - Paso 0
        - Item: 0
        - Estado: Activo
Thread-2 - Worker 2 procesando:
        - Progreso: 0%

La solución con QueueHandler:

import logging
import logging.handlers
import queue
import threading
import atexit

class LoggerSeguro:
    """Sistema de logging thread-safe con cola"""

    def __init__(self, nombre_archivo='app.log'):
        # Cola para logs
        self.cola_logs = queue.Queue()

        # Handler que escribe desde un solo thread
        self.queue_handler = logging.handlers.QueueHandler(self.cola_logs)

        # Listener que procesa la cola
        file_handler = logging.FileHandler(nombre_archivo)
        file_handler.setFormatter(
            logging.Formatter('%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
        )

        self.listener = logging.handlers.QueueListener(
            self.cola_logs,
            file_handler
        )

        # Configurar logger root
        self.logger = logging.getLogger()
        self.logger.addHandler(self.queue_handler)
        self.logger.setLevel(logging.INFO)

        # Iniciar listener
        self.listener.start()

        # Asegurar limpieza al salir
        atexit.register(self.listener.stop)

    def get_logger(self, nombre=None):
        """Obtener logger para un módulo"""
        return logging.getLogger(nombre)

# Sistema de logging estructurado
import json
from datetime import datetime

class LoggerEstructurado:
    """Logger que produce JSON estructurado"""

    def __init__(self, nombre_archivo='app.jsonl'):
        self.lock = threading.Lock()
        self.archivo = nombre_archivo
        self.buffer = []
        self.buffer_size = 100  # Escribir cada 100 logs

    def log(self, nivel, mensaje, **campos_extra):
        """Log thread-safe con datos estructurados"""

        entrada = {
            'timestamp': datetime.utcnow().isoformat(),
            'nivel': nivel,
            'thread': threading.current_thread().name,
            'mensaje': mensaje,
            **campos_extra
        }

        with self.lock:
            self.buffer.append(entrada)

            if len(self.buffer) >= self.buffer_size:
                self._flush()

    def _flush(self):
        """Escribir buffer a disco (debe llamarse con lock)"""
        if not self.buffer:
            return

        with open(self.archivo, 'a') as f:
            for entrada in self.buffer:
                f.write(json.dumps(entrada) + '\n')

        self.buffer.clear()

    def info(self, mensaje, **kwargs):
        self.log('INFO', mensaje, **kwargs)

    def error(self, mensaje, **kwargs):
        self.log('ERROR', mensaje, **kwargs)

    def __del__(self):
        """Asegurar que se escriben todos los logs"""
        with self.lock:
            self._flush()

# Uso correcto
logger_seguro = LoggerEstructurado()

def trabajador_mejorado(worker_id):
    """Worker con logging estructurado"""
    for i in range(10):
        # Log con contexto
        logger_seguro.info(
            f"Procesando item",
            worker_id=worker_id,
            item=i,
            progreso=i*10
        )

        try:
            # Simular trabajo
            resultado = procesar_item(i)

            logger_seguro.info(
                "Item procesado",
                worker_id=worker_id,
                item=i,
                resultado=resultado
            )

        except Exception as e:
            logger_seguro.error(
                "Error procesando item",
                worker_id=worker_id,
                item=i,
                error=str(e),
                traceback=traceback.format_exc()
            )

def procesar_item(item):
    time.sleep(0.1)
    return item * 2

# Alternativa: Logging centralizado con asyncio
import asyncio
import aiofiles

class LoggerAsync:
    """Logger asíncrono sin bloqueos"""

    def __init__(self, archivo='app_async.log'):
        self.archivo = archivo
        self.cola = asyncio.Queue()
        self.task = None

    async def iniciar(self):
        """Inicia el procesador de logs"""
        self.task = asyncio.create_task(self._procesador())

    async def _procesador(self):
        """Procesa logs de la cola"""
        async with aiofiles.open(self.archivo, 'a') as f:
            while True:
                try:
                    # Esperar logs con timeout
                    entrada = await asyncio.wait_for(
                        self.cola.get(),
                        timeout=1.0
                    )

                    # Escribir de forma asíncrona
                    await f.write(entrada + '\n')
                    await f.flush()

                except asyncio.TimeoutError:
                    continue
                except asyncio.CancelledError:
                    # Procesar logs restantes antes de cerrar
                    while not self.cola.empty():
                        entrada = await self.cola.get()
                        await f.write(entrada + '\n')
                    break

    async def log(self, nivel, mensaje):
        """Añadir log a la cola"""
        timestamp = datetime.utcnow().isoformat()
        entrada = f"[{timestamp}] {nivel}: {mensaje}"
        await self.cola.put(entrada)

    async def cerrar(self):
        """Cerrar logger de forma limpia"""
        if self.task:
            self.task.cancel()
            await self.task

5. Cachear resultados compartidos

La pesadilla de la cache con threads

Implementar una cache thread-safe es más difícil de lo que piensas. El 99% de las implementaciones tienen race conditions.

La cache rota que todos hemos hecho:

import threading
import time
import random

class CacheRota:
    """Cache con race conditions"""
    def __init__(self):
        self.cache = {}
        self.lock = threading.Lock()

    def obtener_o_calcular(self, clave, funcion_costosa):
        # PROBLEMA 1: Check-then-act race condition
        if clave not in self.cache:
            # Entre estas dos líneas, otro thread puede añadir la clave
            with self.lock:
                # PROBLEMA 2: Doble cálculo
                # Múltiples threads pueden llegar aquí
                resultado = funcion_costosa()
                self.cache[clave] = resultado
                return resultado

        return self.cache[clave]

def operacion_costosa():
    """Simula operación costosa"""
    print(f"Thread {threading.current_thread().name} calculando...")
    time.sleep(1)
    return random.randint(1, 100)

# Múltiples threads = múltiples cálculos para la misma clave
cache = CacheRota()
threads = []

for i in range(5):
    t = threading.Thread(
        target=lambda: print(cache.obtener_o_calcular("key1", operacion_costosa))
    )
    threads.append(t)
    t.start()

# Verás "calculando..." múltiples veces para la misma clave!

La solución correcta con functools:

from functools import lru_cache, wraps
import threading
import time

# Solución 1: LRU Cache thread-safe incorporada
@lru_cache(maxsize=128)
def operacion_con_cache(parametro):
    """LRU cache es thread-safe por defecto"""
    print(f"Calculando para {parametro}...")
    time.sleep(1)
    return parametro ** 2

# Solución 2: Cache personalizada thread-safe
class CacheInteligente:
    """Cache con lock por clave y lazy loading"""

    def __init__(self, ttl=60):
        self.cache = {}
        self.locks = {}
        self.lock_principal = threading.Lock()
        self.ttl = ttl

    def obtener_o_calcular(self, clave, funcion):
        """Obtiene de cache o calcula, sin doble cálculo"""

        # Fast path - si ya está en cache
        if clave in self.cache:
            valor, timestamp = self.cache[clave]
            if time.time() - timestamp < self.ttl:
                return valor

        # Obtener lock específico para esta clave
        with self.lock_principal:
            if clave not in self.locks:
                self.locks[clave] = threading.Lock()
            lock_clave = self.locks[clave]

        # Ahora solo un thread calculará para esta clave
        with lock_clave:
            # Double-check pattern
            if clave in self.cache:
                valor, timestamp = self.cache[clave]
                if time.time() - timestamp < self.ttl:
                    return valor

            # Calcular solo una vez
            print(f"Calculando {clave} en {threading.current_thread().name}")
            valor = funcion()
            self.cache[clave] = (valor, time.time())

            return valor

    def limpiar_expirados(self):
        """Limpia entradas expiradas"""
        with self.lock_principal:
            ahora = time.time()
            claves_expiradas = [
                k for k, (_, timestamp) in self.cache.items()
                if ahora - timestamp >= self.ttl
            ]

            for k in claves_expiradas:
                del self.cache[k]
                if k in self.locks:
                    del self.locks[k]

# Solución 3: Cache con asyncio (la mejor opción)
import asyncio
from typing import Dict, Any, Callable, Optional

class CacheAsync:
    """Cache asíncrona sin bloqueos"""

    def __init__(self, ttl: int = 60):
        self.cache: Dict[str, tuple[Any, float]] = {}
        self.calculando: Dict[str, asyncio.Future] = {}
        self.ttl = ttl

    async def obtener_o_calcular(
        self,
        clave: str,
        funcion: Callable,
        *args,
        **kwargs
    ):
        """Obtiene de cache o calcula de forma asíncrona"""

        # Verificar cache
        if clave in self.cache:
            valor, timestamp = self.cache[clave]
            if time.time() - timestamp < self.ttl:
                return valor

        # Verificar si ya se está calculando
        if clave in self.calculando:
            # Esperar resultado del cálculo en progreso
            return await self.calculando[clave]

        # Crear future para que otros threads esperen
        future = asyncio.Future()
        self.calculando[clave] = future

        try:
            # Calcular valor
            if asyncio.iscoroutinefunction(funcion):
                valor = await funcion(*args, **kwargs)
            else:
                valor = await asyncio.to_thread(funcion, *args, **kwargs)

            # Guardar en cache
            self.cache[clave] = (valor, time.time())

            # Resolver future para threads esperando
            future.set_result(valor)

            return valor

        except Exception as e:
            future.set_exception(e)
            raise
        finally:
            # Limpiar future
            del self.calculando[clave]

# Decorator para cachear cualquier función
def cache_decorator(ttl=60):
    """Decorator que añade cache thread-safe a cualquier función"""
    cache = {}
    locks = {}
    lock_principal = threading.Lock()

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Crear clave única basada en argumentos
            clave = f"{func.__name__}_{args}_{kwargs}"

            # Check rápido sin lock
            if clave in cache:
                valor, timestamp = cache[clave]
                if time.time() - timestamp < ttl:
                    return valor

            # Obtener o crear lock para esta clave
            with lock_principal:
                if clave not in locks:
                    locks[clave] = threading.Lock()
                lock = locks[clave]

            # Calcular con lock
            with lock:
                # Double-check
                if clave in cache:
                    valor, timestamp = cache[clave]
                    if time.time() - timestamp < ttl:
                        return valor

                # Calcular
                valor = func(*args, **kwargs)
                cache[clave] = (valor, time.time())
                return valor

        return wrapper
    return decorator

# Uso
@cache_decorator(ttl=30)
def operacion_costosa(x, y):
    print(f"Calculando {x} + {y}")
    time.sleep(1)
    return x + y

# Test con múltiples threads
def test_cache():
    for _ in range(3):
        resultado = operacion_costosa(5, 3)
        print(f"Resultado: {resultado}")

threads = []
for i in range(10):
    t = threading.Thread(target=test_cache)
    threads.append(t)
    t.start()

# Solo verás "Calculando 5 + 3" una vez!

6. Ejecutar comandos del sistema operativo

Cuando subprocess + threads = desastre

Mezclar subprocess con threads es una receta para deadlocks y procesos zombie.

El problema con subprocess y threads:

import subprocess
import threading
import time

def ejecutar_comando_mal(comando, resultados, indice):
    """Ejecuta comando en thread - PROBLEMÁTICO"""
    try:
        # PROBLEMA: subprocess no es thread-safe en todas las plataformas
        resultado = subprocess.run(
            comando,
            shell=True,
            capture_output=True,
            text=True,
            timeout=5
        )
        resultados[indice] = resultado.stdout
    except subprocess.TimeoutExpired:
        resultados[indice] = "Timeout"
    except Exception as e:
        resultados[indice] = f"Error: {e}"

# MAL - Múltiples threads ejecutando subprocesos
comandos = [
    "sleep 1 && echo 'Comando 1'",
    "sleep 2 && echo 'Comando 2'",
    "sleep 1 && echo 'Comando 3'",
]

resultados = [None] * len(comandos)
threads = []

for i, cmd in enumerate(comandos):
    t = threading.Thread(target=ejecutar_comando_mal, args=(cmd, resultados, i))
    threads.append(t)
    t.start()

# Problemas:
# 1. File descriptor leaks
# 2. Procesos zombie
# 3. Señales perdidas

La solución correcta con asyncio:

import asyncio
import sys

async def ejecutar_comando_async(comando):
    """Ejecuta comando de forma asíncrona y segura"""

    # Crear proceso asíncrono
    proceso = await asyncio.create_subprocess_shell(
        comando,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Esperar resultado con timeout
    try:
        stdout, stderr = await asyncio.wait_for(
            proceso.communicate(),
            timeout=5.0
        )

        return {
            'comando': comando,
            'codigo': proceso.returncode,
            'stdout': stdout.decode(),
            'stderr': stderr.decode()
        }

    except asyncio.TimeoutError:
        # Matar proceso que no responde
        proceso.kill()
        await proceso.wait()

        return {
            'comando': comando,
            'error': 'Timeout después de 5 segundos'
        }

async def ejecutar_comandos_paralelo(comandos):
    """Ejecuta múltiples comandos en paralelo"""

    # Limitar concurrencia para no sobrecargar el sistema
    semaforo = asyncio.Semaphore(5)

    async def ejecutar_con_limite(cmd):
        async with semaforo:
            return await ejecutar_comando_async(cmd)

    # Ejecutar todos en paralelo
    resultados = await asyncio.gather(
        *[ejecutar_con_limite(cmd) for cmd in comandos],
        return_exceptions=True
    )

    return resultados

# Uso
comandos = [
    "echo 'Hola 1' && sleep 1",
    "echo 'Hola 2' && sleep 2",
    "ls -la",
    "pwd",
    "date"
]

resultados = asyncio.run(ejecutar_comandos_paralelo(comandos))

for resultado in resultados:
    if isinstance(resultado, dict):
        print(f"Comando: {resultado.get('comando', 'unknown')}")
        print(f"Salida: {resultado.get('stdout', resultado.get('error', ''))}")

# Solución alternativa con concurrent.futures
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import subprocess

class EjecutorComandos:
    """Ejecutor de comandos con pool de workers"""

    def __init__(self, max_workers=4):
        # Usar ProcessPoolExecutor para aislamiento total
        self.executor = ProcessPoolExecutor(max_workers=max_workers)

    @staticmethod
    def _ejecutar_comando(comando, timeout=5):
        """Función estática para ejecutar en proceso separado"""
        try:
            resultado = subprocess.run(
                comando,
                shell=True,
                capture_output=True,
                text=True,
                timeout=timeout
            )

            return {
                'comando': comando,
                'exitcode': resultado.returncode,
                'stdout': resultado.stdout,
                'stderr': resultado.stderr,
                'success': resultado.returncode == 0
            }

        except subprocess.TimeoutExpired:
            return {
                'comando': comando,
                'error': f'Timeout después de {timeout} segundos',
                'success': False
            }
        except Exception as e:
            return {
                'comando': comando,
                'error': str(e),
                'success': False
            }

    def ejecutar_paralelo(self, comandos, timeout=5):
        """Ejecuta comandos en paralelo de forma segura"""

        # Submit todas las tareas
        futuros = [
            self.executor.submit(self._ejecutar_comando, cmd, timeout)
            for cmd in comandos
        ]

        # Esperar resultados
        resultados = []
        for futuro in futuros:
            try:
                resultado = futuro.result(timeout=timeout + 1)
                resultados.append(resultado)
            except Exception as e:
                resultados.append({
                    'error': f'Error ejecutando: {e}',
                    'success': False
                })

        return resultados

    def cerrar(self):
        """Cierra el executor de forma limpia"""
        self.executor.shutdown(wait=True)

# Uso seguro
executor = EjecutorComandos(max_workers=4)

comandos = [
    "echo 'Proceso 1'",
    "python -c 'print(2+2)'",
    "ls -la | head -5",
    "sleep 10"  # Este tendrá timeout
]

resultados = executor.ejecutar_paralelo(comandos, timeout=3)

for r in resultados:
    if r['success']:
        print(f"✓ {r['comando']}: {r['stdout'].strip()}")
    else:
        print(f"✗ {r['comando']}: {r.get('error', 'Error desconocido')}")

executor.cerrar()

# Gestión avanzada con pipes y streams
async def ejecutar_con_streaming(comando):
    """Ejecuta comando y procesa salida en tiempo real"""

    proceso = await asyncio.create_subprocess_shell(
        comando,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT
    )

    # Leer salida línea por línea
    async for linea in proceso.stdout:
        print(f"[{comando[:20]}...]: {linea.decode().strip()}")

    await proceso.wait()
    return proceso.returncode

# Ejecutar comando largo con salida en tiempo real
asyncio.run(ejecutar_con_streaming("for i in {1..5}; do echo $i; sleep 1; done"))

7. Conexiones a base de datos

El infierno de los connection pools con threads

Compartir conexiones de base de datos entre threads es la causa #1 de corrupcción de datos y deadlocks.

El código que destruye tu base de datos:

import threading
import sqlite3
import time
import random

# MAL - Conexión compartida entre threads
conexion = sqlite3.connect('database.db', check_same_thread=False)
cursor = conexion.cursor()

# Crear tabla
cursor.execute('''
    CREATE TABLE IF NOT EXISTS cuentas (
        id INTEGER PRIMARY KEY,
        saldo REAL
    )
''')
cursor.execute("INSERT OR IGNORE INTO cuentas VALUES (1, 1000)")
conexion.commit()

def transferir_dinero_mal(de_cuenta, a_cuenta, cantidad):
    """Transferencia con race conditions"""

    # PROBLEMA 1: Lecturas inconsistentes
    cursor.execute("SELECT saldo FROM cuentas WHERE id = ?", (de_cuenta,))
    saldo_origen = cursor.fetchone()[0]

    # PROBLEMA 2: Entre estas líneas otro thread puede modificar
    time.sleep(random.uniform(0.001, 0.01))  # Simular latencia

    if saldo_origen >= cantidad:
        # PROBLEMA 3: Transacciones no atómicas
        cursor.execute(
            "UPDATE cuentas SET saldo = saldo - ? WHERE id = ?",
            (cantidad, de_cuenta)
        )
        cursor.execute(
            "UPDATE cuentas SET saldo = saldo + ? WHERE id = ?",
            (cantidad, a_cuenta)
        )
        conexion.commit()
        print(f"Transferido {cantidad} de {de_cuenta} a {a_cuenta}")
    else:
        print(f"Saldo insuficiente")

# Crear más cuentas
for i in range(2, 5):
    cursor.execute("INSERT OR IGNORE INTO cuentas VALUES (?, 1000)", (i,))
conexion.commit()

# Múltiples threads = CAOS
threads = []
for _ in range(10):
    cuenta_origen = random.randint(1, 4)
    cuenta_destino = random.randint(1, 4)
    while cuenta_destino == cuenta_origen:
        cuenta_destino = random.randint(1, 4)

    cantidad = random.randint(10, 100)

    t = threading.Thread(
        target=transferir_dinero_mal,
        args=(cuenta_origen, cuenta_destino, cantidad)
    )
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# Verificar integridad - el dinero total cambió!
cursor.execute("SELECT SUM(saldo) FROM cuentas")
print(f"Saldo total: {cursor.fetchone()[0]} (debería ser 4000)")

La solución correcta con connection pool:

import sqlite3
from contextlib import contextmanager
import threading
import queue
import time

class PoolConexionesSeguro:
    """Pool de conexiones thread-safe"""

    def __init__(self, database, max_conexiones=5):
        self.database = database
        self.pool = queue.Queue(maxsize=max_conexiones)

        # Crear conexiones iniciales
        for _ in range(max_conexiones):
            conn = self._crear_conexion()
            self.pool.put(conn)

    def _crear_conexion(self):
        """Crea una conexión configurada correctamente"""
        conn = sqlite3.connect(self.database)
        conn.execute("PRAGMA journal_mode=WAL")  # Write-Ahead Logging
        conn.execute("PRAGMA busy_timeout=5000")  # 5 segundos timeout
        conn.row_factory = sqlite3.Row  # Resultados como diccionarios
        return conn

    @contextmanager
    def obtener_conexion(self):
        """Context manager para obtener conexión del pool"""
        conexion = self.pool.get()
        try:
            yield conexion
        finally:
            # Rollback automático si hay transacción activa
            conexion.rollback()
            self.pool.put(conexion)

    @contextmanager
    def transaccion(self):
        """Context manager para transacciones automáticas"""
        with self.obtener_conexion() as conn:
            try:
                yield conn
                conn.commit()
            except Exception:
                conn.rollback()
                raise

# Operaciones seguras con el pool
pool = PoolConexionesSeguro('database_segura.db', max_conexiones=3)

def inicializar_db():
    with pool.transaccion() as conn:
        conn.execute('''
            CREATE TABLE IF NOT EXISTS cuentas (
                id INTEGER PRIMARY KEY,
                saldo REAL NOT NULL,
                version INTEGER DEFAULT 0
            )
        ''')

        # Crear cuentas iniciales
        for i in range(1, 5):
            conn.execute(
                "INSERT OR IGNORE INTO cuentas (id, saldo) VALUES (?, ?)",
                (i, 1000.0)
            )

def transferir_dinero_seguro(de_cuenta, a_cuenta, cantidad):
    """Transferencia con control de concurrencia optimista"""

    with pool.transaccion() as conn:
        # Bloqueo a nivel de fila con SELECT FOR UPDATE simulado
        cursor = conn.execute(
            "SELECT saldo, version FROM cuentas WHERE id = ?",
            (de_cuenta,)
        )
        row = cursor.fetchone()

        if not row:
            raise ValueError(f"Cuenta {de_cuenta} no existe")

        saldo_actual = row[0]
        version_actual = row[1]

        if saldo_actual < cantidad:
            raise ValueError("Saldo insuficiente")

        # Actualizar con verificación de versión
        result = conn.execute("""
            UPDATE cuentas
            SET saldo = saldo - ?, version = version + 1
            WHERE id = ? AND version = ?
        """, (cantidad, de_cuenta, version_actual))

        if result.rowcount == 0:
            raise ValueError("Conflicto de concurrencia - reintentar")

        # Actualizar cuenta destino
        conn.execute(
            "UPDATE cuentas SET saldo = saldo + ? WHERE id = ?",
            (cantidad, a_cuenta)
        )

        print(f"✓ Transferido {cantidad} de cuenta {de_cuenta} a {a_cuenta}")

# Versión con SQLAlchemy (mejor para producción)
from sqlalchemy import create_engine, Column, Integer, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool

Base = declarative_base()

class Cuenta(Base):
    __tablename__ = 'cuentas'

    id = Column(Integer, primary_key=True)
    saldo = Column(Float, nullable=False)
    version = Column(Integer, default=0)

# Configurar engine con pool
engine = create_engine(
    'sqlite:///database_sqlalchemy.db',
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,  # Verificar conexiones antes de usar
    poolclass=QueuePool
)

# Session factory thread-safe
SessionFactory = scoped_session(sessionmaker(bind=engine))

def transferir_con_sqlalchemy(de_cuenta, a_cuenta, cantidad):
    """Transferencia usando SQLAlchemy ORM"""

    session = SessionFactory()

    try:
        # Bloqueo pesimista
        cuenta_origen = session.query(Cuenta).filter_by(
            id=de_cuenta
        ).with_for_update().first()

        cuenta_destino = session.query(Cuenta).filter_by(
            id=a_cuenta
        ).with_for_update().first()

        if cuenta_origen.saldo >= cantidad:
            cuenta_origen.saldo -= cantidad
            cuenta_destino.saldo += cantidad

            session.commit()
            print(f"✓ SQLAlchemy: Transferido {cantidad}")
        else:
            session.rollback()
            print("✗ Saldo insuficiente")

    except Exception as e:
        session.rollback()
        print(f"Error: {e}")
    finally:
        session.close()

# Async con asyncpg (PostgreSQL)
import asyncio
import asyncpg

class PoolAsync:
    """Pool de conexiones asíncronas"""

    def __init__(self):
        self.pool = None

    async def inicializar(self):
        self.pool = await asyncpg.create_pool(
            'postgresql://usuario:password@localhost/db',
            min_size=2,
            max_size=10
        )

    async def transferir_async(self, de_cuenta, a_cuenta, cantidad):
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                # Todo en una transacción
                saldo = await conn.fetchval(
                    "SELECT saldo FROM cuentas WHERE id = $1 FOR UPDATE",
                    de_cuenta
                )

                if saldo >= cantidad:
                    await conn.execute(
                        "UPDATE cuentas SET saldo = saldo - $1 WHERE id = $2",
                        cantidad, de_cuenta
                    )
                    await conn.execute(
                        "UPDATE cuentas SET saldo = saldo + $1 WHERE id = $2",
                        cantidad, a_cuenta
                    )
                    return True
                return False

# Test de integridad
inicializar_db()

threads = []
for _ in range(20):
    t = threading.Thread(
        target=transferir_dinero_seguro,
        args=(
            random.randint(1, 4),
            random.randint(1, 4),
            random.randint(10, 50)
        )
    )
    threads.append(t)
    t.start()

for t in threads:
    t.join()

# Verificar integridad
with pool.obtener_conexion() as conn:
    cursor = conn.execute("SELECT SUM(saldo) FROM cuentas")
    total = cursor.fetchone()[0]
    print(f"Saldo total final: {total} (debe ser 4000)")

Conclusión: Las reglas que debes tatuar en tu mente

Si solo recuerdas 7 cosas, que sean estas:

  1. Para peticiones HTTP: usa asyncio + aiohttp, no threads
  2. Para archivos: usa generadores o asyncio, no threads
  3. Para GUIs: usa colas o señales, nunca actualices desde threads
  4. Para logs: usa QueueHandler o logging estructurado
  5. Para cache: usa lru_cache o implementaciones lock-free
  6. Para comandos: usa asyncio.subprocess o ProcessPoolExecutor
  7. Para bases de datos: usa connection pools o sesiones thread-local

La regla de oro

Antes de crear un thread, hazte estas 3 preguntas:

  1. ¿Puedo hacerlo con asyncio? (90% de las veces, sí)
  2. ¿Necesito paralelismo real? (usa multiprocessing)
  3. ¿Es I/O bloqueante sin alternativa async? (OK, usa threads)

Recursos para no cagarla

Recuerda: el mejor código con threads es el que no tiene threads.

Ahora ve y refactoriza ese código lleno de threads que tienes en producción. Tu yo del futuro te lo agradecerá cuando no tenga que debuggear race conditions a las 3 AM.


¿Cómo has llegado hasta aquí?

Este artículo se envió a la lista de email como respuesta a una pregunta frecuente.

Puedes apuntarte a la lista de email aquí: vamosallio.com