Playing with Ruby Ractors

Exploring Ractors with Ruby 3.0.

Ractors are the new and exciting parallel execution pattern in Ruby 3 and for the first time it makes it possible to use all your processing cores with the standard CRuby without having to use multiple processes.

I needed to solve a little real-world problem, here's what I learned in the process.

Pushing and Pulling Messages

Ractors communicate by sending messages rather than calling methods. Each Ractor has an inbox and an outbox and there are two different ways they communicate.

Pushing Messaging

Messages can be pushed to a Ractor. The receiver will sit and block until a message arrives. When one does, it will wake and process the message. If it can't process the message right away, messages will queue up until it can deal with them. It has a nice big 'in-tray' and will plow through its backlog as and when it gets around to it.

receiver = Ractor.new do
  message = Ractor.receive
end

Messages are sent to the Ractor by calling its send method:

receiver.send('I have some work for you')

Pulling Messaging

Messages can be pulled from a Ractor. The Ractor wanting to send a message will sit and wait until someone takes the message from it. I think of it as not having an 'out-tray' and has to wait until the message in its hand has to be picked up before it can do any more work.

sender = Ractor.new do
  Ractor.yield 'hello from your favorite ractor'
end

Messages are collected from the sender by the take method:

sender.take
# 'hello from your favorite ractor'

The take method will block until the sender Ractor is ready to give it a message.

Pipes

There's often going to be a Ractor that needs to coordinate the two types of messaging, perhaps to act as a queue in which to dump a load of work to be picked up by a pool of worker Ractors later. These combine the two paradigms and are often called pipes.

pipe = Reactor.new do
  Ractor.yield(Ractor.receive)
end

There's not much more to them. They have a big 'in-tray' for a queue of work to build up in them and will send it on to whoever wants and is ready to take the message from them.

Waiting

You are going to have a bunch of Ractors running at some point and you are going to need to ask if any of them have finished.

bunch_of_ractors # an array of Ractors
ractor, message = Ractor.select(*bunch_of_ractors)

The select method will block until one of the Ractors has a message to send to you. It returns the Ractor with the message and the message itself. It's the same as taking the message an that Ractor is free to terminate or carry on processing.

My Problem

So, to my problem. I want to shunt a lot of work to a pool of Ractors to chew through. I know how many items of work that is going to be.

Let's start with the pipe:

pipe = Reactor.new do
  Ractor.yield(Ractor.receive)
end

Let's now put a lot of work into the pipe (ok, we're just sending it an integer here, but you get the point):

100.times do |x|
  pipe.send(x)
end

Exactly the same as the example above. Now let's create some workers. One advantage of Ractors seems to be the sheer number of them you can create, a lot more than the number of threads in a language like Java and way, way more than the number of processes. That said, I'm being very conservative here.

WORKER_COUNT = 10

workers = WORKER_COUNT.times.map do
  Ractor.new(pipe) do |pipe|
    # Take a message from the pipe
    while msg = pipe.take
      sleep rand(5) # pretend to do some work
      Ractor.yield "I have finished with #{msg}"
    end
  end
end

So now there is an array of 10 workers greedily taking from the pipe but when they've finished their work, they'll be left with their message in their hand and nobody to give it to. We need someone waiting and ready to collect it.

output_collector = Ractor.new(workers) do |workers|
  result = 100.times.map do
    ractor, value = Ractor.select(*workers)
    [ractor, value]
  end
  Ractor.yield(result)
end

In the above, the output_collector will spin around 100 times (remember, I'm lucky enough to know how much work I put on the pipe, forgive the magic number a moment). It will wait for exactly 100 responses from the bunch of worker Ractors, building up an array of results as it goes. When all 100 responses are collected, it will be ready to hand that collection on to someone ready to take it off its hands.

All we now need is:

output_collector.take

Let's glue the whole thing together:

WORKER_COUNT = 10

# Create the Queue
pipe = Ractor.new do
  loop do
    Ractor.yield(Ractor.receive)
  end
end

workers = WORKER_COUNT.times.map do
  Ractor.new(pipe) do |pipe|
    # Take a ractor from the queue
    while msg = pipe.take
      sleep rand(5)
      Ractor.yield "I have finished with #{msg}"
    end
  end
end

output_collector = Ractor.new(workers) do |workers|
  result = 100.times.map do
    ractor, value = Ractor.select(*workers)
    [ractor, value]
  end
  Ractor.yield(result)
end

100.times do |x|
  pipe.send(x)
  sleep rand
end

pp output_collector.take
Show Comments