类 Ractor

Ractor 是 Ruby 中的 Actor 模型抽象,提供线程安全的并行执行。

Ractor.new 创建一个新的 Ractor,可以并行运行。

# The simplest ractor
r = Ractor.new {puts "I am in Ractor!"}
r.take # wait for it to finish
# Here, "I am in Ractor!" is printed

Ractor 之间不共享所有对象。这样做有两个主要好处:在 Ractor 之间,数据竞争和竞态条件等线程安全问题是不可能的。另一个好处是并行性。

为了实现这一点,Ractor 之间的对象共享是有限制的。例如,与线程不同,Ractor 无法访问其他 Ractor 中可用的所有对象。即使是通常通过外部作用域中的变量可用的对象,也禁止在 Ractor 之间使用。

a = 1
r = Ractor.new {puts "I am in Ractor! a=#{a}"}
# fails immediately with
# ArgumentError (can not isolate a Proc because it accesses outer variables (a).)

对象必须显式共享

a = 1
r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}

在 CRuby(默认实现)上,每个 Ractor 都持有全局虚拟机锁 (GVL),因此 Ractor 可以并行执行而无需相互锁定。这与 CRuby 上线程的情况不同。

与其访问共享状态,不如通过将对象作为消息发送和接收来传递给 Ractor。

a = 1
r = Ractor.new do
  a_in_ractor = receive # receive blocks until somebody passes a message
  puts "I am in Ractor! a=#{a_in_ractor}"
end
r.send(a)  # pass it
r.take
# Here, "I am in Ractor! a=1" is printed

有两种发送/接收消息的方法对

除此之外,传递给 Ractor.new 的任何参数都将传递给块,并在那里可用,就像通过 Ractor.receive 接收一样,最后一个块值将发送到 Ractor 外部,就像通过 Ractor.yield 发送一样。

经典乒乓球的演示

server = Ractor.new(name: "server") do
  puts "Server starts: #{self.inspect}"
  puts "Server sends: ping"
  Ractor.yield 'ping'                       # The server doesn't know the receiver and sends to whoever interested
  received = Ractor.receive                 # The server doesn't know the sender and receives from whoever sent
  puts "Server received: #{received}"
end

client = Ractor.new(server) do |srv|        # The server is sent to the client, and available as srv
  puts "Client starts: #{self.inspect}"
  received = srv.take                       # The client takes a message from the server
  puts "Client received from " \
       "#{srv.inspect}: #{received}"
  puts "Client sends to " \
       "#{srv.inspect}: pong"
  srv.send 'pong'                           # The client sends a message to the server
end

[client, server].each(&:take)               # Wait until they both finish

这将输出类似以下内容

Server starts: #<Ractor:#2 server test.rb:1 running>
Server sends: ping
Client starts: #<Ractor:#3 test.rb:8 running>
Client received from #<Ractor:#2 server test.rb:1 blocking>: ping
Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong
Server received: pong

Ractor 通过 *输入端口* 接收消息,并通过 *输出端口* 发送消息。 任何一个端口都可以使用 Ractor#close_incomingRactor#close_outgoing 分别禁用。 当 Ractor 终止时,其端口会自动关闭。

可共享和不可共享对象

当对象被发送到 Ractor 或从 Ractor 发送时,了解对象是可共享还是不可共享非常重要。 大多数 Ruby 对象都是不可共享对象。 即使是冻结对象,如果它们包含(通过其实例变量)未冻结对象,也可能是不可共享的。

可共享对象是可以被多个线程使用而不会影响线程安全性的对象,例如数字、truefalseRactor.shareable? 允许您检查这一点,而 Ractor.make_shareable 尝试使对象可共享(如果它还没有),如果它无法做到,则会报错。

Ractor.shareable?(1)            #=> true -- numbers and other immutable basic values are shareable
Ractor.shareable?('foo')        #=> false, unless the string is frozen due to # frozen_string_literal: true
Ractor.shareable?('foo'.freeze) #=> true
Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen

ary = ['hello', 'world']
ary.frozen?                 #=> false
ary[0].frozen?              #=> false
Ractor.make_shareable(ary)
ary.frozen?                 #=> true
ary[0].frozen?              #=> true
ary[1].frozen?              #=> true

当可共享对象被发送(通过 sendRactor.yield)时,不会对其进行任何额外的处理。 它只是变得可以被两个 Ractor 使用。 当不可共享对象被发送时,它可以被 *复制* 或 *移动*。 第一个是默认值,它通过深度克隆 (Object#clone) 其结构的不可共享部分来完全复制对象。

data = ['foo', 'bar'.freeze]
r = Ractor.new do
  data2 = Ractor.receive
  puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}"
end
r.send(data)
r.take
puts "Outside  : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"

这将输出类似以下内容

In ractor: 340, 360, 320
Outside  : 380, 400, 320

请注意,数组和数组中未冻结字符串的 object id 在 Ractor 中发生了变化,因为它们是不同的对象。 第二个数组的元素,它是一个可共享的冻结字符串,是同一个对象。

对象的深度克隆可能很慢,有时甚至不可能。 或者,在发送时可以使用 move: true。 这将 *移动* 不可共享对象到接收 Ractor,使其无法被发送 Ractor 访问。

data = ['foo', 'bar']
r = Ractor.new do
  data_in_ractor = Ractor.receive
  puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}"
end
r.send(data, move: true)
r.take
puts "Outside: moved? #{Ractor::MovedObject === data}"
puts "Outside: #{data.inspect}"

这将输出

In ractor: 100, 120
Outside: moved? true
test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)

请注意,即使 inspect(以及更基本的方法,如 __id__)也无法访问移动对象。

ClassModule 对象是可共享的,因此类/模块定义在 Ractor 之间共享。 Ractor 对象也是可共享的。 对可共享对象的所有操作都是线程安全的,因此线程安全属性将被保留。 我们无法在 Ruby 中定义可变的可共享对象,但 C 扩展可以引入它们。

如果变量的值不可共享,则禁止在其他 Ractor 中访问(获取)可共享对象的实例变量。 这种情况可能发生,因为模块/类是可共享的,但它们可以具有实例变量,其值不可共享。 在非主 Ractor 中,也禁止在类/模块上设置实例变量(即使值是可共享的)。

class C
  class << self
    attr_accessor :tricky
  end
end

C.tricky = "unshareable".dup

r = Ractor.new(C) do |cls|
  puts "I see #{cls}"
  puts "I can't see #{cls.tricky}"
  cls.tricky = true # doesn't get here, but this would also raise an error
end
r.take
# I see C
# can not access instance variables of classes/modules from non-main Ractors (RuntimeError)

如果常量是可共享的,那么 Ractor 可以访问它们。主 Ractor 是唯一可以访问不可共享常量的 Ractor。

GOOD = 'good'.freeze
BAD = 'bad'.dup

r = Ractor.new do
  puts "GOOD=#{GOOD}"
  puts "BAD=#{BAD}"
end
r.take
# GOOD=good
# can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError)

# Consider the same C class from above

r = Ractor.new do
  puts "I see #{C}"
  puts "I can't see #{C.tricky}"
end
r.take
# I see C
# can not access instance variables of classes/modules from non-main Ractors (RuntimeError)

另请参阅 注释语法 说明中对 # shareable_constant_value 编译指示的描述。

Ractor 与线程

每个 Ractor 都有自己的主 Thread。可以在 Ractor 内部创建新线程(并且在 CRuby 上,它们与该 Ractor 的其他线程共享 GVL)。

r = Ractor.new do
  a = 1
  Thread.new {puts "Thread in ractor: a=#{a}"}.join
end
r.take
# Here "Thread in ractor: a=1" will be printed

关于代码示例的说明

在下面的示例中,我们有时会使用以下方法来等待当前未阻塞的 Ractor 完成(或取得进展)。

def wait
  sleep(0.1)
end

这 **仅用于演示目的**,不应在实际代码中使用。在大多数情况下,take 用于等待 Ractor 完成。

参考

有关更多详细信息,请参阅 Ractor 设计文档

公共类方法

count() 点击以切换源代码

返回当前正在运行或阻塞(等待)的 Ractor 数量。

Ractor.count                   #=> 1
r = Ractor.new(name: 'example') { Ractor.yield(1) }
Ractor.count                   #=> 2 (main + example ractor)
r.take                         # wait for Ractor.yield(1)
r.take                         # wait until r will finish
Ractor.count                   #=> 1
# File ractor.rb, line 302
def self.count
  __builtin_cexpr! %q{
    ULONG2NUM(GET_VM()->ractor.cnt);
  }
end
current() 点击以切换源代码

返回当前正在执行的 Ractor

Ractor.current #=> #<Ractor:#1 running>
# File ractor.rb, line 288
def self.current
  __builtin_cexpr! %q{
    rb_ractor_self(rb_ec_ractor_ptr(ec));
  }
end
main() 点击以切换源代码

返回主 Ractor

# File ractor.rb, line 848
def self.main
  __builtin_cexpr! %q{
    rb_ractor_self(GET_VM()->ractor.main_ractor);
  }
end
make_shareable(obj, copy: false) → shareable_obj 点击以切换源代码

使 obj 在 Ractor 之间可共享。

obj 及其引用的所有对象都将被冻结,除非它们已经是可共享的。

如果 copy 关键字为 true,它将在冻结对象之前复制它们,并且不会修改 obj 或其内部对象。

请注意,此方法的规范和实现尚未成熟,将来可能会更改。

obj = ['test']
Ractor.shareable?(obj)     #=> false
Ractor.make_shareable(obj) #=> ["test"]
Ractor.shareable?(obj)     #=> true
obj.frozen?                #=> true
obj[0].frozen?             #=> true

# Copy vs non-copy versions:
obj1 = ['test']
obj1s = Ractor.make_shareable(obj1)
obj1.frozen?                        #=> true
obj1s.object_id == obj1.object_id   #=> true
obj2 = ['test']
obj2s = Ractor.make_shareable(obj2, copy: true)
obj2.frozen?                        #=> false
obj2s.frozen?                       #=> true
obj2s.object_id == obj2.object_id   #=> false
obj2s[0].object_id == obj2[0].object_id #=> false

另请参阅 Ractor 类文档中的“可共享和不可共享对象”部分。

# File ractor.rb, line 825
def self.make_shareable obj, copy: false
  if copy
    __builtin_cexpr! %q{
      rb_ractor_make_shareable_copy(obj);
    }
  else
    __builtin_cexpr! %q{
      rb_ractor_make_shareable(obj);
    }
  end
end
new(*args, name: nil) {|*args| block } → ractor 点击以切换源代码

使用参数和块创建一个新的 Ractor。

给定的块 (Proc) 将被隔离(无法访问任何外部变量)。块内部的 self 将引用当前 Ractor。

r = Ractor.new { puts "Hi, I am #{self.inspect}" }
r.take
# Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"

传递的任何 args 都将通过与通过 send/Ractor.receive 发送的对象相同的规则传播到块参数。如果 args 中的参数不可共享,它将被复制(通过深度克隆,这可能效率低下)。

arg = [1, 2, 3]
puts "Passing: #{arg} (##{arg.object_id})"
r = Ractor.new(arg) {|received_arg|
  puts "Received: #{received_arg} (##{received_arg.object_id})"
}
r.take
# Prints:
#   Passing: [1, 2, 3] (#280)
#   Received: [1, 2, 3] (#300)

Ractor 的 name 可以设置为调试目的

r = Ractor.new(name: 'my ractor') {}; r.take
p r
#=> #<Ractor:#3 my ractor test.rb:1 terminated>
# File ractor.rb, line 273
def self.new(*args, name: nil, &block)
  b = block # TODO: builtin bug
  raise ArgumentError, "must be called with a block" unless block
  if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)")
    warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \
         "Also there are many implementation issues.", uplevel: 0, category: :experimental)
  end
  loc = caller_locations(1, 1).first
  loc = "#{loc.path}:#{loc.lineno}"
  __builtin_ractor_create(loc, name, args, b)
end
receive → msg 点击以切换源代码

从当前 Ractor 的传入端口接收消息(该消息是由另一个 Ractor 通过 send 发送到那里的)。

r = Ractor.new do
  v1 = Ractor.receive
  puts "Received: #{v1}"
end
r.send('message1')
r.take
# Here will be printed: "Received: message1"

或者,可以使用私有实例方法 receive

r = Ractor.new do
  v1 = receive
  puts "Received: #{v1}"
end
r.send('message1')
r.take
# This prints: "Received: message1"

如果队列为空,则方法会阻塞。

r = Ractor.new do
  puts "Before first receive"
  v1 = Ractor.receive
  puts "Received: #{v1}"
  v2 = Ractor.receive
  puts "Received: #{v2}"
end
wait
puts "Still not received"
r.send('message1')
wait
puts "Still received only one"
r.send('message2')
r.take

输出

Before first receive
Still not received
Received: message1
Still received only one
Received: message2

如果在 ractor 上调用了 close_incoming,则如果传入队列中没有更多消息,该方法会引发 Ractor::ClosedError

Ractor.new do
  close_incoming
  receive
end
wait
# in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
# File ractor.rb, line 430
def self.receive
  __builtin_cexpr! %q{
    ractor_receive(ec, rb_ec_ractor_ptr(ec))
  }
end
也称为:recv
receive_if {|msg| block } → msg 点击切换源代码

仅接收特定消息。

Ractor.receive 不同,Ractor.receive_if 可以使用块中的模式(或任何过滤器)来选择 ractor 的传入队列中可用的消息。

r = Ractor.new do
  p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3"
  p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1"
  p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2"
end
r << "bar1"
r << "baz2"
r << "foo3"
r.take

这将输出

foo3
bar1
baz2

如果块返回真值,则消息将从传入队列中删除并返回。否则,消息将保留在传入队列中,并且给定块将检查下一条消息。

如果传入队列中没有剩余消息,则该方法将阻塞,直到有新消息到达。

如果块通过 break/return/exception/throw 退出,则消息将从传入队列中删除,就像返回了真值一样。

r = Ractor.new do
  val = Ractor.receive_if{|msg| msg.is_a?(Array)}
  puts "Received successfully: #{val}"
end

r.send(1)
r.send('test')
wait
puts "2 non-matching sent, nothing received"
r.send([1, 2, 3])
wait

打印

2 non-matching sent, nothing received
Received successfully: [1, 2, 3]

请注意,您不能在给定块中递归调用 receive/receive_if。您不应该在块中执行除消息过滤之外的任何任务。

Ractor.current << true
Ractor.receive_if{|msg| Ractor.receive}
#=> `receive': can not call receive/receive_if recursively (Ractor::Error)
# File ractor.rb, line 509
def self.receive_if &b
  Primitive.ractor_receive_if b
end
recv
别名:receive
select(*ractors, [yield_value:, move: false]) → [ractor or symbol, obj] 点击切换源代码

等待任何 ractor 在其输出端口中拥有内容,从该 ractor 读取,然后返回该 ractor 和接收到的对象。

r1 = Ractor.new {Ractor.yield 'from 1'}
r2 = Ractor.new {Ractor.yield 'from 2'}

r, obj = Ractor.select(r1, r2)

puts "received #{obj.inspect} from #{r.inspect}"
# Prints: received "from 1" from #<Ractor:#2 test.rb:1 running>
# But could just as well print "from r2" here, either prints could be first.

如果给定的 ractor 之一是当前 ractor,并且它被选中,则 r 将包含 :receive 符号而不是 ractor 对象。

r1 = Ractor.new(Ractor.current) do |main|
  main.send 'to main'
  Ractor.yield 'from 1'
end
r2 = Ractor.new do
  Ractor.yield 'from 2'
end

r, obj = Ractor.select(r1, r2, Ractor.current)
puts "received #{obj.inspect} from #{r.inspect}"
# Could print: received "to main" from :receive

如果提供了 yield_value,则如果另一个 ractor 正在调用 take,则可以生成该值。在这种情况下,将返回对 [:yield, nil]

r1 = Ractor.new(Ractor.current) do |main|
  puts "Received from main: #{main.take}"
end

puts "Trying to select"
r, obj = Ractor.select(r1, Ractor.current, yield_value: 123)
wait
puts "Received #{obj.inspect} from #{r.inspect}"

这将打印

Trying to select
Received from main: 123
Received nil from :yield

move 布尔标志定义是否复制(默认)或移动生成的值。

# File ractor.rb, line 358
def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
  raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty?

  if ractors.delete Ractor.current
    do_receive = true
  else
    do_receive = false
  end

  __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move
end
shareable?(obj) → true | false 点击切换源代码

检查对象是否可以被 ractor 共享。

Ractor.shareable?(1)            #=> true -- numbers and other immutable basic values are frozen
Ractor.shareable?('foo')        #=> false, unless the string is frozen due to # frozen_string_literal: true
Ractor.shareable?('foo'.freeze) #=> true

另请参阅 Ractor 类文档中的“可共享和不可共享对象”部分。

# File ractor.rb, line 784
def self.shareable? obj
  __builtin_cexpr! %q{
    RBOOL(rb_ractor_shareable_p(obj));
  }
end
yield(msg, move: false) → nil 点击切换源代码

将消息发送到当前 Ractor 的输出端口,以便被 take 接收。

r = Ractor.new {Ractor.yield 'Hello from ractor'}
puts r.take
# Prints: "Hello from ractor"

此方法是阻塞的,只有在有人消费发送的消息时才会返回。

r = Ractor.new do
  Ractor.yield 'Hello from ractor'
  puts "Ractor: after yield"
end
wait
puts "Still not taken"
puts r.take

这将打印

Still not taken
Hello from ractor
Ractor: after yield

如果输出端口已使用 close_outgoing 关闭,则该方法将引发

r = Ractor.new do
  close_outgoing
  Ractor.yield 'Hello from ractor'
end
wait
# `yield': The outgoing-port is already closed (Ractor::ClosedError)

move 参数的含义与 send 相同。

# File ractor.rb, line 643
def self.yield(obj, move: false)
  __builtin_cexpr! %q{
    ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
  }
end

公共实例方法

<<
别名:send
[](sym) 点击切换源代码

从 Ractor 本地存储中获取值

# File ractor.rb, line 838
def [](sym)
  Primitive.ractor_local_value(sym)
end
[]=(sym, val) 点击切换源代码

在 Ractor 本地存储中设置值

# File ractor.rb, line 843
def []=(sym, val)
  Primitive.ractor_local_value_set(sym, val)
end
close_incoming → true | false 点击切换源代码

关闭输入端口并返回它是否已关闭。所有进一步尝试在 Ractor 中 Ractor.receive,以及向 Ractor 发送 send 将失败,并出现 Ractor::ClosedError 错误。

r = Ractor.new {sleep(500)}
r.close_incoming  #=> false
r.close_incoming  #=> true
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)
# File ractor.rb, line 749
def close_incoming
  __builtin_cexpr! %q{
    ractor_close_incoming(ec, RACTOR_PTR(self));
  }
end
close_outgoing → true | false 点击切换源代码

关闭输出端口并返回它是否已关闭。所有进一步尝试在 Ractor 中 Ractor.yield,以及从 Ractor 中 take 将失败,并出现 Ractor::ClosedError 错误。

r = Ractor.new {sleep(500)}
r.close_outgoing  #=> false
r.close_outgoing  #=> true
r.take
# Ractor::ClosedError (The outgoing-port is already closed)
# File ractor.rb, line 767
def close_outgoing
  __builtin_cexpr! %q{
    ractor_close_outgoing(ec, RACTOR_PTR(self));
  }
end
inspect() 点击切换源代码
# File ractor.rb, line 716
def inspect
  loc  = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
  name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
  id   = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) }
  status = __builtin_cexpr! %q{
    rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_))
  }
  "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>"
end
也称为:to_s
name() 点击切换源代码

Ractor.new 中设置的名称,或 nil

# File ractor.rb, line 729
def name
  __builtin_cexpr! %q{RACTOR_PTR(self)->name}
end
recv()
别名:receive
send(msg, move: false) → self 点击切换源代码

将消息发送到 Ractor 的输入队列,以便被 Ractor.receive 接收。

r = Ractor.new do
  value = Ractor.receive
  puts "Received #{value}"
end
r.send 'message'
# Prints: "Received: message"

该方法是非阻塞的(即使 Ractor 尚未准备好接收任何内容,也会立即返回)。

r = Ractor.new {sleep(5)}
r.send('test')
puts "Sent successfully"
# Prints: "Sent successfully" immediately

尝试向已完成执行的 Ractor 发送消息将引发 Ractor::ClosedError 错误。

r = Ractor.new {}
r.take
p r
# "#<Ractor:#6 (irb):23 terminated>"
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)

如果在该 ractor 上调用了 close_incoming,该方法也会引发 Ractor::ClosedError

r =  Ractor.new do
  sleep(500)
  receive
end
r.close_incoming
r.send('test')
# Ractor::ClosedError (The incoming-port is already closed)
# The error is raised immediately, not when the ractor tries to receive

如果 obj 不可共享,默认情况下它将通过深度克隆复制到接收 ractor 中。如果传递了 move: true,则该对象将被移动到接收 ractor 中,并且发送方将无法访问它。

r = Ractor.new {puts "Received: #{receive}"}
msg = 'message'
r.send(msg, move: true)
r.take
p msg

这将打印

Received: message
in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>

发送方对该对象及其所有部分的引用将变得无效。

r = Ractor.new {puts "Received: #{receive}"}
s = 'message'
ary = [s]
copy = ary.dup
r.send(ary, move: true)

s.inspect
# Ractor::MovedError (can not send any methods to a moved object)
ary.class
# Ractor::MovedError (can not send any methods to a moved object)
copy.class
# => Array, it is different object
copy[0].inspect
# Ractor::MovedError (can not send any methods to a moved object)
# ...but its item was still a reference to `s`, which was moved

如果该对象是可共享的,move: true 对它没有影响。

r = Ractor.new {puts "Received: #{receive}"}
s = 'message'.freeze
r.send(s, move: true)
s.inspect #=> "message", still available
# File ractor.rb, line 599
def send(obj, move: false)
  __builtin_cexpr! %q{
    ractor_send(ec, RACTOR_PTR(self), obj, move)
  }
end
也称为:<<
take → msg 点击以切换源代码

从 ractor 的输出端口获取一条消息,该消息由 Ractor.yield 或在 ractor 终止时放入。

r = Ractor.new do
  Ractor.yield 'explicit yield'
  'last value'
end
puts r.take #=> 'explicit yield'
puts r.take #=> 'last value'
puts r.take # Ractor::ClosedError (The outgoing-port is already closed)

最后一个值也被发送到输出端口这一事实意味着 take 可以用作 Thread#join 的模拟(“只需等待 ractor 完成”)。但是,如果有人已经消费了该消息,它将引发异常。

如果输出端口使用 close_outgoing 关闭,该方法将引发 Ractor::ClosedError

r = Ractor.new do
  sleep(500)
  Ractor.yield 'Hello from ractor'
end
r.close_outgoing
r.take
# Ractor::ClosedError (The outgoing-port is already closed)
# The error would be raised immediately, not when ractor will try to receive

如果在 Ractor 中引发了未捕获的异常,它将通过 take 作为 Ractor::RemoteError 传播。

r = Ractor.new {raise "Something weird happened"}

begin
  r.take
rescue => e
  p e              #  => #<Ractor::RemoteError: thrown by remote Ractor.>
  p e.ractor == r  # => true
  p e.cause        # => #<RuntimeError: Something weird happened>
end

Ractor::ClosedErrorStopIteration 的后代,因此 ractor 的终止将使任何接收此消息的循环退出,而不会传播错误。

r = Ractor.new do
  3.times {|i| Ractor.yield "message #{i}"}
  "finishing"
end

loop {puts "Received: " + r.take}
puts "Continue successfully"

这将打印

Received: message 0
Received: message 1
Received: message 2
Received: finishing
Continue successfully
# File ractor.rb, line 710
def take
  __builtin_cexpr! %q{
    ractor_take(ec, RACTOR_PTR(self))
  }
end
to_s()
别名:inspect

私有实例方法

receive() 点击以切换源代码

Ractor.receive 相同

# File ractor.rb, line 441
        def receive
  __builtin_cexpr! %q{
    ractor_receive(ec, rb_ec_ractor_ptr(ec))
  }
end
也称为:recv
receive_if(&b) 点击以切换源代码

Ractor.receive_if 相同

# File ractor.rb, line 514
        def receive_if &b
  Primitive.ractor_receive_if b
end