3 Jan 2022

Elixir: spawn, messages, Async, Await

spawn processes

In Elixir, lots and lots of processes run concurrently. When writing a program, you can spawn a process that is isolated from other processes (the Erlang VM scheduler takes care of managing CPU time for these processes). Each process has its own mailbox, and processes can communicate with one another by sending messages to these mailboxes.

Here, the long_process function is going to take 5 seconds to complete. By wrapping it in a spawn, the rest of the code can keep executing.

long_process = fn -> :timer.sleep(5000) end
pid = spawn(fn -> long_process.() end)
# rest of code

send messages

Using send, we're able to send a message to another process. send/2 receives a process id as the first argument, and the message to send as the second, and returns the message. In the following, a process is sending itself a message.

self() # returns process id

send(self(), "Hello")

# shows info about the unread messages in mailbox {:messages, ["Hello"]}
Process.info(self(), :messages)

recieve messages

The :messages atom contains a list of all the unread messages sent to the process. If we want to read that message, we can use receive. receive is a synchronous, blocking call, that uses pattern-matching to wait for a message. In the following message -> IO.puts message it's matching anything - in our case that's the string "Hello".

self() # returns process id

send(self(), "Hello")

# shows info about the unread messages in mailbox {:messages, ["Hello"]}
IO.inspect(Process.info(self(), :messages))

receive do
  message -> IO.inspect message
end

# now the mailbox is empty {:messages, []}
IO.inspect(Process.info(self(), :messages))

In the above we've already sent the message before receive waits for it, so the message is already in the mailbox and receive immediately reads it. But if we combine it with a process that takes a while to complete, we'll see that it waits until the message is recevied before printing the empty mailbox.


pid = self()
long_process = fn -> :timer.sleep(3000); end


spawn(fn ->
    long_process.();
    send(pid, "Hello")
end)

IO.puts("before")

receive do
  message -> IO.puts message
end

IO.puts("after")

receive infinitely

A receive process is only going to wait for the first matched message. If sending multiple messages, we'd need to set up multiple receive processes.

Here's an example of an looping process that listens for messages indefinitely.

defmodule Person do

  def on(person) do
    spawn fn -> listen(person) end
  end

  defp listen(person) do
    psn = receive do
      {:hobby, value}  -> %{person | hobby: value}
      {:name, value} -> :timer.sleep(2000); %{person | name: value}
      {:display, caller}  -> send(caller, person)
    end
    # IO.inspect(psn)
    listen(psn)
  end
end

Let's break it down. Inside on we spawn a new process so that another process calling Person.on/1 would continue execution.

The spawned process calls listen/1, which uses receive to wait for a few different message types. When it receives a match, the value is assigned to psn (in the case of {:display, caller} -> send(caller, person), send returns its message, so person is still assigned to psn). Then, it calls listen(psn) to wait for the next message.

We can send messages to a Person with something like the following:

person = Person.on(%{name: 'Evan', hobby: 'Fishing'})
send(person, {:hobby, 'Sports'})
send(person, {:name, 'Horatio'})
send(person, {:display, self()})
send(person, {:name, 'Jamal'})
send(person, {:hobby, 'Food'})

receive do
  value -> IO.puts("Received a message:"); IO.inspect(value)
  after 5000 -> IO.puts "No message received"
end

Each message contains a tuple, which is going to be pattern matched in Person.listen. In the case of :hobby, it will update the map, with :name it will update after a 2 second delay, and with :display it's going to send a message containing the current Person map back to the calling process. That calling process has it's own receive that is waiting for a message that sent message.

The after clause is a way to terminate the receive task. So, if no message is received after 5 seconds, the task will exit. You can verify this is the case by removing send(person, {:display, self()}), or, since the :name message takes 2 seconds to resolve, by changing to after 1000 -> IO.puts "No message received".