Cómo pasar mensajes entre procesos

Con las primitivas send y receive podemos hacer que dos procesos se intercambien información, que es el método de comunicación por defecto en Elixir, teniendo procesos aislados que se comunican mediante un sistema de bandejas de entrada bajo un modelo de actores.

¿Por qué compartir información?

Los procesos o hilos en computación necesitarán compartir información. Por ejemplo, si invocamos diez hilos para poder hacer un procesamiento intenso de manera distribuida, paralela o concurrente, será necesario tanto enviar la información inicial a cada proceso, como que cada proceso reporte su estado o el resultado de su computación.

Típicamente, muchos lenguajes de programación implementan una concurrencia basada en zonas de memoria comunes. Es decir, existe una región de memoria que puede ser accedida de forma simultánea por varios procesos. Sin embargo, uno de los problemas de este sistema es que es necesario que los procesos o hilos hagan un uso correcto de la memoria para no corromperla y afectar al funcionamiento de otros procesos.

Por ejemplo, imaginemos que estamos implementando un progreso como una variable compartida de tipo numérica (una terrible idea, pero suficiente para ilustrar este ejemplo), que va a valer inicialmente 0. Cada vez que un hilo haga una parte del proceso, va a incrementar el valor de esta variable en 1. De este modo, si hemos dividido la tarea en 64 partes, idealmente esa variable debería incrementarse a 64 cuando terminemos de procesar la información. Excepto que es bastante probable que esto no pase así.

Supongamos que como parte de este flujo en el que un hilo consulta lo que vale la variable, incrementa su valor, y la vuelve a colocar, otro hilo hace lo mismo exactamente a la vez, y consulta el valor de la variable antes que el hilo anterior vuelque el nuevo valor. Ese segundo hilo no sabe que hay otro proceso incrementando en este momento, y por lo tanto estará trabajando con información anticuada. Cuando deposite el nuevo valor, no tendrá en cuenta ese otro incremento que se ha dado a la vez.

Esta es la razón por la que hay que usar técnicas de control de concurrencia, como variables atómicas, semáforos, y esas cosas.

¿Cómo se comparte información en un proceso de Elixir?

En el caso de los lenguajes de programación para la BEAM, como Elixir, también será necesario que los procesos que lancemos compartan información entre sí. Sin embargo, Elixir lo trata de una manera diferente a la concurrencia tradicional, que es más segura.

Inicialmente, en BEAM, los procesos son independientes entre sí y no pueden compartir información de forma directa en memoria. Nada de acceder a variables compartidas, nada de buffers. En su lugar, BEAM implementa un sistema de concurrencia basado en actores y paso de mensajes.

Cada actor tendrá como parte de su existencia una bandeja de mensajes en la que recibir mensajes de otros procesos, y a su vez un proceso podrá mandar mensajes a otros procesos. Mediante las primitivas de envío y recibo de mensajes, un proceso le puede pedir a Elixir que mande un mensaje a otro proceso, o que compruebe si hay mensajes nuevos.

Y para ello, un proceso tendrá que conocer exactamente el PID del proceso al que le quiere enviar un mensaje. Ya estudiamos en la lección anterior qué es un PID: es el identificador numérico de cada proceso. Si un proceso tiene una referencia a otro PID, puede mandarle mensajes que quedarán en su bandeja de entrada a la espera de que sean leídos por el proceso receptor.

Cada cola de mensajes es independiente entre sí y única, y además los mensajes son inmutables e independientes. Cuando un proceso manda un mensaje a otro, se clona el mensaje, nunca es una referencia compartida. Así que eso, sumado a que la bandeja de mensajes es personal de cada proceso y por lo tanto no puede ser modificada desde fuera, hace que este modelo de programación sea ordenado (porque la comunicación sólo se hace mediante envío de mensajes) y seguro (porque los mensajes no pueden ser corrompidos cuando hay varios procesos trabajando con información relacionada).

Este modelo de trabajo basado en el envío de mensajes también es lo que favorece el modelo de concurrencia en OTP. Todas estas solicitudes de envío de mensaje o de recepción del siguiente mensaje de la cola es lo que le permite a la OTP interceptar el control del procesador del ordenador o del nodo BEAM y poder hacer su gestión de concurrencia de una manera independiente a la que tendría el sistema operativo. Dicho de otro modo, a la que la OTP recibe una solicitud de envío de mensaje, usará su administrador de procesos interno para hacer un poco de multitasking y dejar que otro proceso diferente haga cálculos. La plataforma va a favorecer, por lo tanto, que los procesos comuniquen con frecuencia, porque esto permitirá a la OTP cambiar con soltura entre procesos. Esto permite que cientos o miles de procesos funcionen en simultáneo, porque si son ricos en operaciones de entrada-salida, permite cambiar rápidamente entre un par de procesos que tengan cómputo que hacer y que por lo tanto no estén bloqueados.

Una forma efectiva de usar el modelo de concurrencia basado en actores sería implementar, por lo tanto, servicios. En vez de invocar funciones sueltas que hagan cálculos, podríamos implementar una función que espere a que le lleguen solicitudes a su bandeja de entrada. Cuando llegue una de estas solicitudes, tomará la coordenada PID del proceso que le mandó la solicitud y le enviará un mensaje con la respuesta de esa solicitud. Un modelo cliente-servidor local a nivel de proceso.

Recepción de mensajes con receive

Vamos a basarnos en la función de saludar:

defmodule Ejemplo do
  def saludar(nombre)
    "¡Hola, " <> nombre <> "!"
  end
end

Utilizaremos la palabra clave receive para pedirle al lenguaje de programación que el proceso obtenga el siguiente mensaje que se deposite en su bandeja de entrada. Esta palabra clave debe llevar adicionalmente un bloque do-end con el procesamiento que queremos que se haga con ese mensaje:

receive do
  # Aquí el código que queremos que se ejecute con el mensaje.
end

Esta palabra clave tiene algunas condiciones de uso:

  • Los mensajes de la bandeja de entrada de un proceso no se eliminarán hasta que sean leídos (salvo que haya un error general). Eso significa que si se acumulan mensajes, se podrá usar receive para procesarlos en bucle, uno a uno. receive sólo procesará un mensaje.
  • receive generalmente se quedará a la espera si todavía no hay mensajes en la bandeja de entrada. El proceso quedará bloqueado hasta que alguien le diga algo, momento en el cual se desbloqueará y procesará prácticamente al instante el mensaje recibido (porque por fin lo tiene).
  • Los mensajes se eliminan de la cola cuando se procesan. Múltiples llamadas a receive irán retornando los sucesivos mensajes que había en la cola, pero sólo los podrá procesar una vez.

La forma de trabajar con el bloque do-end de un receive será emplear pattern matching. Esto es parecido a como funciona un case. Pondremos varios patrones y el primero que coincida será el que evalúe de forma global. En su forma más simple, esto podría ser hecho del siguiente modo:

receive do
  nom -> IO.puts("Hola, " <> nom <> ".")
end

En este caso, cuando se reciba un mensaje, matcheará contra nom, por lo que se evaluará la expresión de código del otro lado de la flecha, provocando que se haga una impresión por pantalla del mensaje especificado.

Sin embargo, en este caso todo lo que debe tener el mensaje es el string que queremos que se imprima. Normalmente querremos pasarle una estructura como un array, una tupla o un mapa, para poder identificar metadatos, tales como el identificador del proceso que mandó el mensaje. En el siguiente ejemplo, el pattern matching extrae automáticamente los elementos PID y nombre del mapa que recibe como mensaje:

receive do
  {pid, nom} -> IO.puts("Hola, " <> nom <> ".")
end

En definitiva, receive nos va a permitir recibir expresiones, que podremos decodificar con pattern matching. Hasta un PID se podrá enviar como parte de un mensaje, que tiene la misma semántica para hacer el pattern matching que otros elementos del lenguaje como un case.

Cómo limitar el tiempo que un proceso puede quedar esperando

Si un proceso hace receive cuando su bandeja de entrada está vacía, se quedará completamente bloqueado hasta que le entre un mensaje que la pueda desbloquear.

Este sistema también es el responsable de que Elixir pueda usar cientos o miles de procesos en una misma máquina virtual BEAM, porque los procesos que están a la espera de otros no necesitan tiempo de CPU y por lo tanto quedan inactivos hasta que su bandeja de entrada tenga algo que la desbloquee.

Sin embargo, si queremos evitar que un proceso quede eternamente congelado, podemos utilizar la palabra clave after como parte de un bloque receive para desbloquear el proceso y especificar un comportamiento propio o una expresión a retornar en caso de que hayan pasado tantos milisegundos como digamos sin que se haya recibido ningún mensaje. Esto nos permite implementar nuestros propios timeouts.

receive do
  {pid, nom} -> IO.puts("Hola, " <> nom <> ".")
  after 1_000 -> IO.puts("No ha llegado nada en 1 segundo")
end

En este caso, a la palabra clave after le acompañan cuántos milisegundos hay que esperar. Si ha pasado esa cantidad de tiempo y seguimos a la espera de un mensaje, inmediatamente se evalúa lo que haya junto al bloque after, y será eso lo que se retorna como parte de la expresión receive.

Podemos usar ese after para devolver valores por defecto o comunicar errores sin que el proceso quede completamente bloqueado de forma infinita.

spawn de un proceso que recibe mensajes

Dado el siguiente módulo:

defmodule Ejemplo do
  def saludar do
    receive do
      nom -> IO.puts("Hola, #{nom}")
    end
  end
end

Ahora podría desde mi REPL volver a hacer un spawn de este proceso usando mi función spawn:

iex(2)> pid = spawn(Ejemplo, :saludar, [])
#PID<0.138.0>

En este caso, el proceso no termina de manera directa. Tampoco hace un bucle infinito, como hacíamos en la lección anterior. Simplemente, se ha quedado bloqueado esperando a que alguien le mande un mensaje, en la llamada al receive. Podemos atestiguar esto viendo que el proceso está vivo:

iex(4)> Process.alive?(pid)
true

Y además, si sacamos la información del proceso, vemos que su estado es waiting (pista: es el campo status del mapa que devuelve la función Process.info/1):

iex(3)> Process.info(pid)
[
  current_function: {Ejemplo, :saludar, 0},
  initial_call: {Ejemplo, :saludar, 0},
  status: :waiting,
  message_queue_len: 0,
  links: [],
  dictionary: [],
  trap_exit: false,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.66.0>,
  total_heap_size: 233,
  heap_size: 233,
  stack_size: 2,
  reductions: 5,
  garbage_collection: [
    max_heap_size: %{error_logger: true, kill: true, size: 0},
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 65535,
    minor_gcs: 0
  ],
  suspending: []
]

Envío de mensajes

Para desbloquear el proceso, podemos enviarle un mensaje. Para ello usamos la función send/2, la cual acepta como primer parámetro el PID del proceso al que le queremos enviar el mensaje, y como segundo parámetro lo que queramos depositar en la bandeja de mensajes del proceso. Esta función ademas devuelve el mismo mensaje que le hemos enviado.

Pongamos que tenemos un proceso cuyo PID está en la variable pid. Inicialmente, el proceso tiene la cola de mensajes vacía:

iex(5)> pid = spawn(Ejemplo, :saludar, [])
#PID<0.142.0>

# Mensajes en la cola: 0

Ahora le vamos a enviar un mensaje con la función send. El mensaje se guardará en la cola:

iex(6)> send(pid, "Joe")
"Joe"

# Mensjaes en la cola: 1
# 0 => "Joe"

Podemos enviarle más mensajes y se irán guardando en la cola.

iex(7)> send(pid, [1, 2, 3, 4])
[1, 2, 3, 4]

# Mensajes en la cola: 2
# 0 => "Joe"
# 1 => [1, 2, 3, 4]

Cuando el proceso llame a la función receive, recibirá el primer mensaje, que será sacado de la cola. El resto de mensajes se quedarán ahí.

# El proceso pid llama a receive. Se procesa el mensaje "Joe".

# Mensajes en la cola: 1
# 1 => [1, 2, 3, 4]

Cuando el proceso vuelva a llamar a receive, recibirá el segundo mensaje:

# El proceso pid llama a receive. Se procesa el mensaje [1, 2, 3, 4].

# Mensajes en la cola: 0

Ejemplo de envío de mensajes

Volvamos a nuestro ejemplo anterior. Ese proceso cuando reciba un mensaje lo imprimirá por pantalla porque llama a IO.puts. Podemos comprobar que ahora hacerle un send al mismo provocará que se escriba por pantalla un mensaje:

iex(6)> send(pid, "Joe")
Hola, Joe
"Joe"

En este caso vemos el mensaje Hola, Joe, que procede del propio proceso una vez que hace su gestión mediante receive del mensaje recibido. El segundo "Joe" que vemos por pantalla se corresponde con el retorno de la función send.

Envío de mensajes de vuelta

Ahora imaginemos que tenemos el siguiente código:

defmodule Ejemplo do
  def saludar do
    receive do
      nom -> "Hola, #{nom}"
    end
  end
end

En este caso, la llamada a receive tiene en su pattern matching una gestión del mensaje que consiste en envolverlo en otro string. Sin embargo, debido a que no se hace ningún efecto colateral (como una llamada a IO.puts), no se va a ver nada cuando se le pase un mensaje.

iex(10)> send(pid, "Joe")
"Joe"

De nuevo, vemos ese "Joe" porque tenemos ahí el retorno de la llamada a send, pero el proceso no habrá comunicado nada al mundo exterior.

Esto lo podemos solucionar mandando un mensaje de vuelta usando la función send() desde el propio bloque receive del proceso. Para ello, además, tendremos que hacer algunos cambios al programa. Por ejemplo, tendremos que aceptar como mensaje una estructura que nos proporcione el PID del proceso remitente, ya que de otro modo no sabríamos a quién responder. Con ese PID, podemos hacer un send dentro del receive:

defmodule Ejemplo do
  def saludar do
    receive do
      {pid, nom} -> send(pid, "Hola, #{nom}")
    end
  end
end

Si ahora le enviamos un mensaje, tendremos que hacerlo con la nueva semántica. Mediante la función self podemos sacar el PID de nuestro proceso. De este modo, el mensaje nos volverá de vuelta.

iex(11)> send(pid, {self(), "Joe"})
{#PID<0.106.0>, "Joe"}

Para ver el mensaje de vuelta, será necesario que el proceso remitente también lo trate mediante un receive. Incluso desde IEx podemos hacer esto:

iex(4)> receive do
...(4)>   x -> IO.puts(x)
...(4)> end
Hola, Joe
:ok

En este caso, el Hola, Joe es el puts de nuestro bloque receive local. Recibimos el string con la respuesta y lo imprimimos. Además, receive evalúa a un :ok como consecuencia del proceso.

Cómo esperar más mensajes

receive va a procesar únicamente un mensaje. Con el código actual, si comprobamos con Process.alive? si el proceso está vivo, nos va a devolver false, debido a que su ejecución ha terminado.

Para procesar más mensajes, tenemos que crear un bucle. La forma más simple de hacer esto sería crear una llamada recursiva al final, para que con optimización de recursividad por cola vuelva a esperar otro mensaje:

defmodule Ejemplo do
  def saludar do
    receive do
      {pid, nom} -> send(pid, "Hola, #{nom}")
    end
    saludar
  end
end

De este modo, si ahora le mandamos múltiples mensajes, todos estos mensajes serán procesados, porque después de un receive se disparará otro.

Lista de reproducción
  1. 1
    Cómo crear procesos
    11 minutos
  2. 2
    Cómo pasar mensajes entre procesos
    13 minutos
  3. 3
    Diccionario de un proceso y mantener un estado
    15 minutos
  4. 4
    Cómo enlazar procesos para detectar fallos
    13 minutos
  5. 5
    Cómo monitorizar procesos
    10 minutos
  6. 6
    ¿Qué es un GenServer?
    15 minutos
  7. 7
    Cómo enviar mensajes con un GenServer
    19 minutos
  8. 8
    Control de errores y gestión de un GenServer
    19 minutos
  9. 9
    Cómo renombrar procesos
    11 minutos
  10. 10
    Cómo crear un Supervisor Tree
    17 minutos
  11. 11
    Estrategias para trabajar con Supervisor
    18 minutos
  12. 12
    Estrategias para crear un Supervisor
    11 minutos
  13. 13
    Resumen sobre procesos OTP
    12 minutos
  14. 14
    Cómo usar Application
    12 minutos
  15. 15
    Ejemplo de Application con hijos
    14 minutos