类 Ractor
Ractor 是 Ruby 中的 Actor 模型抽象,提供线程安全的并行执行。
Ractor.new
创建一个新的 Ractor,可以并行运行。
# The simplest ractor r = Ractor.new {puts "I am in Ractor!"} r.take # wait for it to finish # Here, "I am in Ractor!" is printed
Ractor 之间不共享所有对象。这样做有两个主要好处:在 Ractor 之间,数据竞争和竞态条件等线程安全问题是不可能的。另一个好处是并行性。
为了实现这一点,Ractor 之间的对象共享是有限制的。例如,与线程不同,Ractor 无法访问其他 Ractor 中可用的所有对象。即使是通常通过外部作用域中的变量可用的对象,也禁止在 Ractor 之间使用。
a = 1 r = Ractor.new {puts "I am in Ractor! a=#{a}"} # fails immediately with # ArgumentError (can not isolate a Proc because it accesses outer variables (a).)
对象必须显式共享
a = 1 r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}
在 CRuby(默认实现)上,每个 Ractor 都持有全局虚拟机锁 (GVL),因此 Ractor 可以并行执行而无需相互锁定。这与 CRuby 上线程的情况不同。
与其访问共享状态,不如通过将对象作为消息发送和接收来传递给 Ractor。
a = 1 r = Ractor.new do a_in_ractor = receive # receive blocks until somebody passes a message puts "I am in Ractor! a=#{a_in_ractor}" end r.send(a) # pass it r.take # Here, "I am in Ractor! a=1" is printed
有两种发送/接收消息的方法对
-
Ractor#send
和Ractor.receive
用于发送者知道接收者(推送); -
Ractor.yield
和Ractor#take
用于接收者知道发送者(拉取);
除此之外,传递给 Ractor.new
的任何参数都将传递给块,并在那里可用,就像通过 Ractor.receive
接收一样,最后一个块值将发送到 Ractor 外部,就像通过 Ractor.yield
发送一样。
经典乒乓球的演示
server = Ractor.new(name: "server") do puts "Server starts: #{self.inspect}" puts "Server sends: ping" Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested received = Ractor.receive # The server doesn't know the sender and receives from whoever sent puts "Server received: #{received}" end client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv puts "Client starts: #{self.inspect}" received = srv.take # The client takes a message from the server puts "Client received from " \ "#{srv.inspect}: #{received}" puts "Client sends to " \ "#{srv.inspect}: pong" srv.send 'pong' # The client sends a message to the server end [client, server].each(&:take) # Wait until they both finish
这将输出类似以下内容
Server starts: #<Ractor:#2 server test.rb:1 running> Server sends: ping Client starts: #<Ractor:#3 test.rb:8 running> Client received from #<Ractor:#2 server test.rb:1 blocking>: ping Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong Server received: pong
Ractor 通过 *输入端口* 接收消息,并通过 *输出端口* 发送消息。 任何一个端口都可以使用 Ractor#close_incoming
和 Ractor#close_outgoing
分别禁用。 当 Ractor 终止时,其端口会自动关闭。
可共享和不可共享对象¶ ↑
当对象被发送到 Ractor 或从 Ractor 发送时,了解对象是可共享还是不可共享非常重要。 大多数 Ruby 对象都是不可共享对象。 即使是冻结对象,如果它们包含(通过其实例变量)未冻结对象,也可能是不可共享的。
可共享对象是可以被多个线程使用而不会影响线程安全性的对象,例如数字、true
和 false
。 Ractor.shareable?
允许您检查这一点,而 Ractor.make_shareable
尝试使对象可共享(如果它还没有),如果它无法做到,则会报错。
Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true Ractor.shareable?('foo'.freeze) #=> true Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen ary = ['hello', 'world'] ary.frozen? #=> false ary[0].frozen? #=> false Ractor.make_shareable(ary) ary.frozen? #=> true ary[0].frozen? #=> true ary[1].frozen? #=> true
当可共享对象被发送(通过 send
或 Ractor.yield
)时,不会对其进行任何额外的处理。 它只是变得可以被两个 Ractor 使用。 当不可共享对象被发送时,它可以被 *复制* 或 *移动*。 第一个是默认值,它通过深度克隆 (Object#clone
) 其结构的不可共享部分来完全复制对象。
data = ['foo', 'bar'.freeze] r = Ractor.new do data2 = Ractor.receive puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}" end r.send(data) r.take puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"
这将输出类似以下内容
In ractor: 340, 360, 320 Outside : 380, 400, 320
请注意,数组和数组中未冻结字符串的 object id 在 Ractor 中发生了变化,因为它们是不同的对象。 第二个数组的元素,它是一个可共享的冻结字符串,是同一个对象。
对象的深度克隆可能很慢,有时甚至不可能。 或者,在发送时可以使用 move: true
。 这将 *移动* 不可共享对象到接收 Ractor,使其无法被发送 Ractor 访问。
data = ['foo', 'bar'] r = Ractor.new do data_in_ractor = Ractor.receive puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}" end r.send(data, move: true) r.take puts "Outside: moved? #{Ractor::MovedObject === data}" puts "Outside: #{data.inspect}"
这将输出
In ractor: 100, 120 Outside: moved? true test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)
请注意,即使 inspect
(以及更基本的方法,如 __id__
)也无法访问移动对象。
Class
和 Module
对象是可共享的,因此类/模块定义在 Ractor 之间共享。 Ractor 对象也是可共享的。 对可共享对象的所有操作都是线程安全的,因此线程安全属性将被保留。 我们无法在 Ruby 中定义可变的可共享对象,但 C 扩展可以引入它们。
如果变量的值不可共享,则禁止在其他 Ractor 中访问(获取)可共享对象的实例变量。 这种情况可能发生,因为模块/类是可共享的,但它们可以具有实例变量,其值不可共享。 在非主 Ractor 中,也禁止在类/模块上设置实例变量(即使值是可共享的)。
class C class << self attr_accessor :tricky end end C.tricky = "unshareable".dup r = Ractor.new(C) do |cls| puts "I see #{cls}" puts "I can't see #{cls.tricky}" cls.tricky = true # doesn't get here, but this would also raise an error end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
如果常量是可共享的,那么 Ractor 可以访问它们。主 Ractor 是唯一可以访问不可共享常量的 Ractor。
GOOD = 'good'.freeze BAD = 'bad'.dup r = Ractor.new do puts "GOOD=#{GOOD}" puts "BAD=#{BAD}" end r.take # GOOD=good # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError) # Consider the same C class from above r = Ractor.new do puts "I see #{C}" puts "I can't see #{C.tricky}" end r.take # I see C # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
另请参阅 注释语法 说明中对 # shareable_constant_value
编译指示的描述。
Ractor 与线程¶ ↑
每个 Ractor 都有自己的主 Thread
。可以在 Ractor 内部创建新线程(并且在 CRuby 上,它们与该 Ractor 的其他线程共享 GVL)。
r = Ractor.new do a = 1 Thread.new {puts "Thread in ractor: a=#{a}"}.join end r.take # Here "Thread in ractor: a=1" will be printed
关于代码示例的说明¶ ↑
在下面的示例中,我们有时会使用以下方法来等待当前未阻塞的 Ractor 完成(或取得进展)。
def wait sleep(0.1) end
这 **仅用于演示目的**,不应在实际代码中使用。在大多数情况下,take
用于等待 Ractor 完成。
参考¶ ↑
有关更多详细信息,请参阅 Ractor 设计文档。
公共类方法
返回当前正在运行或阻塞(等待)的 Ractor 数量。
Ractor.count #=> 1 r = Ractor.new(name: 'example') { Ractor.yield(1) } Ractor.count #=> 2 (main + example ractor) r.take # wait for Ractor.yield(1) r.take # wait until r will finish Ractor.count #=> 1
# File ractor.rb, line 302 def self.count __builtin_cexpr! %q{ ULONG2NUM(GET_VM()->ractor.cnt); } end
返回当前正在执行的 Ractor
。
Ractor.current #=> #<Ractor:#1 running>
# File ractor.rb, line 288 def self.current __builtin_cexpr! %q{ rb_ractor_self(rb_ec_ractor_ptr(ec)); } end
返回主 Ractor
# File ractor.rb, line 848 def self.main __builtin_cexpr! %q{ rb_ractor_self(GET_VM()->ractor.main_ractor); } end
使用参数和块创建一个新的 Ractor。
给定的块 (Proc
) 将被隔离(无法访问任何外部变量)。块内部的 self
将引用当前 Ractor。
r = Ractor.new { puts "Hi, I am #{self.inspect}" } r.take # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"
传递的任何 args
都将通过与通过 send
/Ractor.receive 发送的对象相同的规则传播到块参数。如果 args
中的参数不可共享,它将被复制(通过深度克隆,这可能效率低下)。
arg = [1, 2, 3] puts "Passing: #{arg} (##{arg.object_id})" r = Ractor.new(arg) {|received_arg| puts "Received: #{received_arg} (##{received_arg.object_id})" } r.take # Prints: # Passing: [1, 2, 3] (#280) # Received: [1, 2, 3] (#300)
Ractor 的 name
可以设置为调试目的
r = Ractor.new(name: 'my ractor') {}; r.take p r #=> #<Ractor:#3 my ractor test.rb:1 terminated>
# File ractor.rb, line 273 def self.new(*args, name: nil, &block) b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)") warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \ "Also there are many implementation issues.", uplevel: 0, category: :experimental) end loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" __builtin_ractor_create(loc, name, args, b) end
从当前 Ractor 的传入端口接收消息(该消息是由另一个 Ractor 通过 send
发送到那里的)。
r = Ractor.new do v1 = Ractor.receive puts "Received: #{v1}" end r.send('message1') r.take # Here will be printed: "Received: message1"
或者,可以使用私有实例方法 receive
r = Ractor.new do v1 = receive puts "Received: #{v1}" end r.send('message1') r.take # This prints: "Received: message1"
如果队列为空,则方法会阻塞。
r = Ractor.new do puts "Before first receive" v1 = Ractor.receive puts "Received: #{v1}" v2 = Ractor.receive puts "Received: #{v2}" end wait puts "Still not received" r.send('message1') wait puts "Still received only one" r.send('message2') r.take
输出
Before first receive Still not received Received: message1 Still received only one Received: message2
如果在 ractor 上调用了 close_incoming
,则如果传入队列中没有更多消息,该方法会引发 Ractor::ClosedError
Ractor.new do close_incoming receive end wait # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
# File ractor.rb, line 430 def self.receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
仅接收特定消息。
与 Ractor.receive
不同,Ractor.receive_if
可以使用块中的模式(或任何过滤器)来选择 ractor 的传入队列中可用的消息。
r = Ractor.new do p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" end r << "bar1" r << "baz2" r << "foo3" r.take
这将输出
foo3 bar1 baz2
如果块返回真值,则消息将从传入队列中删除并返回。否则,消息将保留在传入队列中,并且给定块将检查下一条消息。
如果传入队列中没有剩余消息,则该方法将阻塞,直到有新消息到达。
如果块通过 break/return/exception/throw 退出,则消息将从传入队列中删除,就像返回了真值一样。
r = Ractor.new do val = Ractor.receive_if{|msg| msg.is_a?(Array)} puts "Received successfully: #{val}" end r.send(1) r.send('test') wait puts "2 non-matching sent, nothing received" r.send([1, 2, 3]) wait
打印
2 non-matching sent, nothing received Received successfully: [1, 2, 3]
请注意,您不能在给定块中递归调用 receive/receive_if。您不应该在块中执行除消息过滤之外的任何任务。
Ractor.current << true Ractor.receive_if{|msg| Ractor.receive} #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
# File ractor.rb, line 509 def self.receive_if &b Primitive.ractor_receive_if b end
等待任何 ractor 在其输出端口中拥有内容,从该 ractor 读取,然后返回该 ractor 和接收到的对象。
r1 = Ractor.new {Ractor.yield 'from 1'} r2 = Ractor.new {Ractor.yield 'from 2'} r, obj = Ractor.select(r1, r2) puts "received #{obj.inspect} from #{r.inspect}" # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> # But could just as well print "from r2" here, either prints could be first.
如果给定的 ractor 之一是当前 ractor,并且它被选中,则 r
将包含 :receive
符号而不是 ractor 对象。
r1 = Ractor.new(Ractor.current) do |main| main.send 'to main' Ractor.yield 'from 1' end r2 = Ractor.new do Ractor.yield 'from 2' end r, obj = Ractor.select(r1, r2, Ractor.current) puts "received #{obj.inspect} from #{r.inspect}" # Could print: received "to main" from :receive
如果提供了 yield_value
,则如果另一个 ractor 正在调用 take
,则可以生成该值。在这种情况下,将返回对 [:yield, nil]
。
r1 = Ractor.new(Ractor.current) do |main| puts "Received from main: #{main.take}" end puts "Trying to select" r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) wait puts "Received #{obj.inspect} from #{r.inspect}"
这将打印
Trying to select Received from main: 123 Received nil from :yield
move
布尔标志定义是否复制(默认)或移动生成的值。
# File ractor.rb, line 358 def self.select(*ractors, yield_value: yield_unspecified = true, move: false) raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? if ractors.delete Ractor.current do_receive = true else do_receive = false end __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move end
将消息发送到当前 Ractor 的输出端口,以便被 take
接收。
r = Ractor.new {Ractor.yield 'Hello from ractor'} puts r.take # Prints: "Hello from ractor"
此方法是阻塞的,只有在有人消费发送的消息时才会返回。
r = Ractor.new do Ractor.yield 'Hello from ractor' puts "Ractor: after yield" end wait puts "Still not taken" puts r.take
这将打印
Still not taken Hello from ractor Ractor: after yield
如果输出端口已使用 close_outgoing
关闭,则该方法将引发
r = Ractor.new do close_outgoing Ractor.yield 'Hello from ractor' end wait # `yield': The outgoing-port is already closed (Ractor::ClosedError)
move
参数的含义与 send
相同。
# File ractor.rb, line 643 def self.yield(obj, move: false) __builtin_cexpr! %q{ ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) } end
公共实例方法
从 Ractor 本地存储中获取值
# File ractor.rb, line 838 def [](sym) Primitive.ractor_local_value(sym) end
在 Ractor 本地存储中设置值
# File ractor.rb, line 843 def []=(sym, val) Primitive.ractor_local_value_set(sym, val) end
关闭输入端口并返回它是否已关闭。所有进一步尝试在 Ractor 中 Ractor.receive
,以及向 Ractor 发送 send
将失败,并出现 Ractor::ClosedError
错误。
r = Ractor.new {sleep(500)} r.close_incoming #=> false r.close_incoming #=> true r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
# File ractor.rb, line 749 def close_incoming __builtin_cexpr! %q{ ractor_close_incoming(ec, RACTOR_PTR(self)); } end
关闭输出端口并返回它是否已关闭。所有进一步尝试在 Ractor 中 Ractor.yield
,以及从 Ractor 中 take
将失败,并出现 Ractor::ClosedError
错误。
r = Ractor.new {sleep(500)} r.close_outgoing #=> false r.close_outgoing #=> true r.take # Ractor::ClosedError (The outgoing-port is already closed)
# File ractor.rb, line 767 def close_outgoing __builtin_cexpr! %q{ ractor_close_outgoing(ec, RACTOR_PTR(self)); } end
# File ractor.rb, line 716 def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } id = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) } status = __builtin_cexpr! %q{ rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_)) } "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>" end
在 Ractor.new
中设置的名称,或 nil
。
# File ractor.rb, line 729 def name __builtin_cexpr! %q{RACTOR_PTR(self)->name} end
将消息发送到 Ractor 的输入队列,以便被 Ractor.receive
接收。
r = Ractor.new do value = Ractor.receive puts "Received #{value}" end r.send 'message' # Prints: "Received: message"
该方法是非阻塞的(即使 Ractor 尚未准备好接收任何内容,也会立即返回)。
r = Ractor.new {sleep(5)} r.send('test') puts "Sent successfully" # Prints: "Sent successfully" immediately
尝试向已完成执行的 Ractor 发送消息将引发 Ractor::ClosedError
错误。
r = Ractor.new {} r.take p r # "#<Ractor:#6 (irb):23 terminated>" r.send('test') # Ractor::ClosedError (The incoming-port is already closed)
如果在该 ractor 上调用了 close_incoming
,该方法也会引发 Ractor::ClosedError
。
r = Ractor.new do sleep(500) receive end r.close_incoming r.send('test') # Ractor::ClosedError (The incoming-port is already closed) # The error is raised immediately, not when the ractor tries to receive
如果 obj
不可共享,默认情况下它将通过深度克隆复制到接收 ractor 中。如果传递了 move: true
,则该对象将被移动到接收 ractor 中,并且发送方将无法访问它。
r = Ractor.new {puts "Received: #{receive}"} msg = 'message' r.send(msg, move: true) r.take p msg
这将打印
Received: message in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>
发送方对该对象及其所有部分的引用将变得无效。
r = Ractor.new {puts "Received: #{receive}"} s = 'message' ary = [s] copy = ary.dup r.send(ary, move: true) s.inspect # Ractor::MovedError (can not send any methods to a moved object) ary.class # Ractor::MovedError (can not send any methods to a moved object) copy.class # => Array, it is different object copy[0].inspect # Ractor::MovedError (can not send any methods to a moved object) # ...but its item was still a reference to `s`, which was moved
如果该对象是可共享的,move: true
对它没有影响。
r = Ractor.new {puts "Received: #{receive}"} s = 'message'.freeze r.send(s, move: true) s.inspect #=> "message", still available
# File ractor.rb, line 599 def send(obj, move: false) __builtin_cexpr! %q{ ractor_send(ec, RACTOR_PTR(self), obj, move) } end
从 ractor 的输出端口获取一条消息,该消息由 Ractor.yield
或在 ractor 终止时放入。
r = Ractor.new do Ractor.yield 'explicit yield' 'last value' end puts r.take #=> 'explicit yield' puts r.take #=> 'last value' puts r.take # Ractor::ClosedError (The outgoing-port is already closed)
最后一个值也被发送到输出端口这一事实意味着 take
可以用作 Thread#join
的模拟(“只需等待 ractor 完成”)。但是,如果有人已经消费了该消息,它将引发异常。
如果输出端口使用 close_outgoing
关闭,该方法将引发 Ractor::ClosedError
。
r = Ractor.new do sleep(500) Ractor.yield 'Hello from ractor' end r.close_outgoing r.take # Ractor::ClosedError (The outgoing-port is already closed) # The error would be raised immediately, not when ractor will try to receive
如果在 Ractor
中引发了未捕获的异常,它将通过 take 作为 Ractor::RemoteError
传播。
r = Ractor.new {raise "Something weird happened"} begin r.take rescue => e p e # => #<Ractor::RemoteError: thrown by remote Ractor.> p e.ractor == r # => true p e.cause # => #<RuntimeError: Something weird happened> end
Ractor::ClosedError
是 StopIteration
的后代,因此 ractor 的终止将使任何接收此消息的循环退出,而不会传播错误。
r = Ractor.new do 3.times {|i| Ractor.yield "message #{i}"} "finishing" end loop {puts "Received: " + r.take} puts "Continue successfully"
这将打印
Received: message 0 Received: message 1 Received: message 2 Received: finishing Continue successfully
# File ractor.rb, line 710 def take __builtin_cexpr! %q{ ractor_take(ec, RACTOR_PTR(self)) } end
私有实例方法
与 Ractor.receive
相同
# File ractor.rb, line 441 def receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end
与 Ractor.receive_if
相同
# File ractor.rb, line 514 def receive_if &b Primitive.ractor_receive_if b end