Playing with Ruby Ractors
Exploring Ractors with Ruby 3.0.
Ractors are the new and exciting parallel execution pattern in Ruby 3 and for the first time it makes it possible to use all your processing cores with the standard CRuby without having to use multiple processes.
I needed to solve a little real-world problem, here's what I learned in the process.
Pushing and Pulling Messages
Ractors communicate by sending messages rather than calling methods. Each Ractor has an inbox and an outbox and there are two different ways they communicate.
Pushing Messaging
Messages can be pushed to a Ractor. The receiver will sit and block until a message arrives. When one does, it will wake and process the message. If it can't process the message right away, messages will queue up until it can deal with them. It has a nice big 'in-tray' and will plow through its backlog as and when it gets around to it.
receiver = Ractor.new do
message = Ractor.receive
end
Messages are sent to the Ractor by calling its send
method:
receiver.send('I have some work for you')
Pulling Messaging
Messages can be pulled from a Ractor. The Ractor wanting to send a message will sit and wait until someone takes the message from it. I think of it as not having an 'out-tray' and has to wait until the message in its hand has to be picked up before it can do any more work.
sender = Ractor.new do
Ractor.yield 'hello from your favorite ractor'
end
Messages are collected from the sender by the take
method:
sender.take
# 'hello from your favorite ractor'
The take
method will block until the sender
Ractor is ready to give it a message.
Pipes
There's often going to be a Ractor that needs to coordinate the two types of messaging, perhaps to act as a queue in which to dump a load of work to be picked up by a pool of worker Ractors later. These combine the two paradigms and are often called pipes.
pipe = Reactor.new do
Ractor.yield(Ractor.receive)
end
There's not much more to them. They have a big 'in-tray' for a queue of work to build up in them and will send it on to whoever wants and is ready to take the message from them.
Waiting
You are going to have a bunch of Ractors running at some point and you are going to need to ask if any of them have finished.
bunch_of_ractors # an array of Ractors
ractor, message = Ractor.select(*bunch_of_ractors)
The select
method will block until one of the Ractors has a message to send to you. It returns the Ractor with the message and the message itself. It's the same as taking the message an that Ractor is free to terminate or carry on processing.
My Problem
So, to my problem. I want to shunt a lot of work to a pool of Ractors to chew through. I know how many items of work that is going to be.
Let's start with the pipe:
pipe = Reactor.new do
Ractor.yield(Ractor.receive)
end
Let's now put a lot of work into the pipe (ok, we're just sending it an integer here, but you get the point):
100.times do |x|
pipe.send(x)
end
Exactly the same as the example above. Now let's create some workers. One advantage of Ractors seems to be the sheer number of them you can create, a lot more than the number of threads in a language like Java and way, way more than the number of processes. That said, I'm being very conservative here.
WORKER_COUNT = 10
workers = WORKER_COUNT.times.map do
Ractor.new(pipe) do |pipe|
# Take a message from the pipe
while msg = pipe.take
sleep rand(5) # pretend to do some work
Ractor.yield "I have finished with #{msg}"
end
end
end
So now there is an array of 10 workers greedily taking from the pipe but when they've finished their work, they'll be left with their message in their hand and nobody to give it to. We need someone waiting and ready to collect it.
output_collector = Ractor.new(workers) do |workers|
result = 100.times.map do
ractor, value = Ractor.select(*workers)
[ractor, value]
end
Ractor.yield(result)
end
In the above, the output_collector
will spin around 100 times (remember, I'm lucky enough to know how much work I put on the pipe, forgive the magic number a moment). It will wait for exactly 100 responses from the bunch of worker Ractors, building up an array of results as it goes. When all 100 responses are collected, it will be ready to hand that collection on to someone ready to take it off its hands.
All we now need is:
output_collector.take
Let's glue the whole thing together:
WORKER_COUNT = 10
# Create the Queue
pipe = Ractor.new do
loop do
Ractor.yield(Ractor.receive)
end
end
workers = WORKER_COUNT.times.map do
Ractor.new(pipe) do |pipe|
# Take a ractor from the queue
while msg = pipe.take
sleep rand(5)
Ractor.yield "I have finished with #{msg}"
end
end
end
output_collector = Ractor.new(workers) do |workers|
result = 100.times.map do
ractor, value = Ractor.select(*workers)
[ractor, value]
end
Ractor.yield(result)
end
100.times do |x|
pipe.send(x)
sleep rand
end
pp output_collector.take
Member discussion