Fiber 类

Fiber 是在 Ruby 中实现轻量级协作并发的原语。从本质上讲,它们是一种创建可暂停和恢复的代码块的方法,很像线程。主要区别在于它们永远不会被抢占,并且调度必须由程序员完成,而不是 VM。

与其他无栈轻量级并发模型不同,每个 Fiber 都带有栈。这使 Fiber 能够从 Fiber 块内的深度嵌套函数调用中暂停。请参阅 ruby(1) 手册页以配置 Fiber 栈的大小。

创建 Fiber 时,它不会自动运行。相反,必须使用 Fiber#resume 方法显式要求它运行。Fiber 内运行的代码可以通过调用 Fiber.yield 放弃控制,在这种情况下,它会将控制权让回调用方(Fiber#resume 的调用方)。

让步或终止后,Fiber 将返回最后执行的表达式的值

例如

fiber = Fiber.new do
  Fiber.yield 1
  2
end

puts fiber.resume
puts fiber.resume
puts fiber.resume

产生

1
2
FiberError: dead fiber called

Fiber#resume 方法接受任意数量的参数,如果这是对 resume 的第一次调用,那么它们将作为块参数传递。否则,它们将是 Fiber.yield 调用的返回值

示例

fiber = Fiber.new do |first|
  second = Fiber.yield first + 2
end

puts fiber.resume 10
puts fiber.resume 1_000_000
puts fiber.resume "The fiber will be dead before I can cause trouble"

产生

12
1000000
FiberError: dead fiber called

非阻塞 Fiber

非阻塞 Fiber 的概念是在 Ruby 3.0 中引入的。当非阻塞 Fiber 达到通常会阻塞 Fiber 的操作(如 sleep 或等待另一个进程或 I/O)时,它会将控制权让给其他 Fiber,并允许调度程序处理阻塞并在可以继续时唤醒(恢复)此 Fiber。

要使 Fiber 表现为非阻塞,需要在 Fiber.new 中使用 blocking: false(这是默认值)创建它,并且应使用 Fiber.set_scheduler 设置 Fiber.scheduler。如果在当前线程中未设置 Fiber.scheduler,则阻塞和非阻塞 Fiber 的行为是相同的。

Ruby 不提供调度程序类:它应该由用户实现,并对应于 Fiber::Scheduler

还有 Fiber.schedule 方法,它预计以非阻塞方式立即执行给定的块。它的实际实现取决于调度程序。

公共类方法

Fiber[key] → value 单击以切换源

返回由 key 标识的 fiber 存储变量的值。

key 必须是符号,并且值由 Fiber#[]= 或 Fiber#store 设置。

另请参阅 Fiber::[]=

static VALUE
rb_fiber_storage_aref(VALUE class, VALUE key)
{
    Check_Type(key, T_SYMBOL);

    VALUE storage = fiber_storage_get(fiber_current(), FALSE);
    if (storage == Qnil) return Qnil;

    return rb_hash_aref(storage, key);
}
Fiber[key] = value 单击以切换源

value 分配给由 key 标识的 fiber 存储变量。如果变量不存在,则创建该变量。

key 必须是 Symbol,否则会引发 TypeError

另请参阅 Fiber::[]

static VALUE
rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value)
{
    Check_Type(key, T_SYMBOL);

    VALUE storage = fiber_storage_get(fiber_current(), value != Qnil);
    if (storage == Qnil) return Qnil;

    if (value == Qnil) {
        return rb_hash_delete(storage, key);
    }
    else {
        return rb_hash_aset(storage, key, value);
    }
}
blocking{|fiber| ...} → result 单击以切换源

强制 fiber 在块的持续时间内处于阻塞状态。返回块的结果。

有关详细信息,请参阅类文档中的“非阻塞 fiber”部分。

VALUE
rb_fiber_blocking(VALUE class)
{
    VALUE fiber_value = rb_fiber_current();
    rb_fiber_t *fiber = fiber_ptr(fiber_value);

    // If we are already blocking, this is essentially a no-op:
    if (fiber->blocking) {
        return rb_yield(fiber_value);
    }
    else {
        return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value);
    }
}
blocking? → false 或 1 单击以切换源

如果当前 fiber 是非阻塞的,则返回 false。如果通过将 blocking: false 传递给 Fiber.new 或通过 Fiber.schedule 创建 Fiber,则 Fiber 是非阻塞的。

如果当前 Fiber 处于阻塞状态,则该方法返回 1。未来的发展可能会允许返回更大的整数的情况。

请注意,即使该方法返回 falseFiber 仅在当前线程中设置 Fiber.scheduler 时才表现得不同。

有关详细信息,请参阅类文档中的“非阻塞 fiber”部分。

static VALUE
rb_fiber_s_blocking_p(VALUE klass)
{
    rb_thread_t *thread = GET_THREAD();
    unsigned blocking = thread->blocking;

    if (blocking == 0)
        return Qfalse;

    return INT2NUM(blocking);
}
current → fiber 单击以切换源

返回当前 fiber。如果您不在 fiber 的上下文中运行,则此方法将返回根 fiber。

static VALUE
rb_fiber_s_current(VALUE klass)
{
    return rb_fiber_current();
}
current_scheduler → obj 或 nil 单击以切换源

返回 Fiber 调度程序,该调度程序上次是使用 Fiber.set_scheduler 为当前线程设置的,当且仅当当前 fiber 是非阻塞的时。

static VALUE
rb_fiber_current_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_current();
}
new(blocking: false, storage: true) { |*args| ... } → fiber 单击以切换源

创建新的 Fiber。最初,该 fiber 并未运行,可以使用 resume 恢复运行。第一个 resume 调用的参数将传递给该块

f = Fiber.new do |initial|
   current = initial
   loop do
     puts "current: #{current.inspect}"
     current = Fiber.yield
   end
end
f.resume(100)     # prints: current: 100
f.resume(1, 2, 3) # prints: current: [1, 2, 3]
f.resume          # prints: current: nil
# ... and so on ...

如果将 blocking: false 传递给 Fiber.new并且当前线程已定义 Fiber.scheduler,则 Fiber 将变为非阻塞(请参阅类文档中的“非阻塞 Fiber”部分)。

如果未指定 storage,则默认继承当前 fiber 的存储副本。这与指定 storage: true 相同。

Fiber[:x] = 1
Fiber.new do
  Fiber[:x] # => 1
  Fiber[:x] = 2
end.resume
Fiber[:x] # => 1

如果给定的 storagenil,则此函数将延迟初始化内部存储,该存储最初为空哈希。

Fiber[:x] = "Hello World"
Fiber.new(storage: nil) do
  Fiber[:x] # nil
end

否则,给定的 storage 将用作新 fiber 的存储,并且它必须是 Hash 的实例。

目前,显式使用 storage: true 属于实验性质,将来可能会更改。

static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
    return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
schedule { |*args| ... } → fiber 单击以切换源

此方法预期在单独的非阻塞 fiber 中立即运行提供的代码块。

puts "Go to sleep!"

Fiber.set_scheduler(MyScheduler.new)

Fiber.schedule do
  puts "Going to sleep"
  sleep(1)
  puts "I slept well"
end

puts "Wakey-wakey, sleepyhead"

假设 MyScheduler 已正确实现,则此程序将生成

Go to sleep!
Going to sleep
Wakey-wakey, sleepyhead
...1 sec pause here...
I slept well

…例如,在 Fibersleep(1))内的第一个阻塞操作中,控制权将让渡给外部代码(主 fiber),并且在该执行结束时,调度程序将负责正确恢复所有阻塞的 fiber。

请注意,上面描述的行为是此方法预期的行为,实际行为取决于当前调度程序对 Fiber::Scheduler#fiber 方法的实现。Ruby 不会强制此方法以任何特定方式运行。

如果未设置调度程序,则此方法将引发 RuntimeError (No scheduler is available!)

static VALUE
rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj)
{
    return rb_fiber_s_schedule_kw(argc, argv, rb_keyword_given_p());
}
scheduler → obj or nil 单击以切换源

返回 Fiber 调度程序,该调度程序最后通过 Fiber.set_scheduler 为当前线程设置。如果未设置调度程序(这是默认设置),则返回 nil,并且非阻塞 fiber 的行为与阻塞相同。(有关调度程序概念的详细信息,请参阅类文档中的“非阻塞 fiber”部分)。

static VALUE
rb_fiber_s_scheduler(VALUE klass)
{
    return rb_fiber_scheduler_get();
}
set_scheduler(scheduler) → scheduler 单击以切换源

为当前线程设置 Fiber 调度程序。如果设置了调度程序,则非阻塞 fiber(由 Fiber.new 使用 blocking: false 创建,或由 Fiber.schedule 创建)将在潜在的阻塞操作中调用该调度程序的钩子方法,并且当前线程将在最终确定时调用调度程序的 close 方法(允许调度程序正确管理所有未完成的 fiber)。

scheduler 可以是对应于 Fiber::Scheduler 的任何类的对象。它的实现取决于用户。

另请参阅类文档中的“非阻塞 fiber”部分。

static VALUE
rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
{
    return rb_fiber_scheduler_set(scheduler);
}
yield(args, ...) → obj 点击切换来源

将控制权让回到恢复 fiber 的上下文中,传递任何传递给它的参数。当接下来调用 resume 时,fiber 将在此处恢复处理。传递给下一个 resume 的任何参数都将是此 Fiber.yield 表达式求值的结果。

static VALUE
rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass)
{
    return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p());
}

公共实例方法

alive? → true 或 false 点击切换来源

如果 fiber 仍然可以恢复(或转移到),则返回 true。在完成 fiber 块的执行后,此方法将始终返回 false

VALUE
rb_fiber_alive_p(VALUE fiber_value)
{
    return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value)));
}
backtrace → 数组 点击切换来源
backtrace(start) → 数组
backtrace(start, count) → 数组
backtrace(start..end) → 数组

返回 fiber 的当前执行堆栈。startcountend 允许仅选择 backtrace 的部分。

def level3
  Fiber.yield
end

def level2
  level3
end

def level1
  level2
end

f = Fiber.new { level1 }

# It is empty before the fiber started
f.backtrace
#=> []

f.resume

f.backtrace
#=> ["test.rb:2:in `yield'", "test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(1) # start from the item 1
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'", "test.rb:13:in `block in <main>'"]
p f.backtrace(2, 2) # start from item 2, take 2
#=> ["test.rb:6:in `level2'", "test.rb:10:in `level1'"]
p f.backtrace(1..3) # take items from 1 to 3
#=> ["test.rb:2:in `level3'", "test.rb:6:in `level2'", "test.rb:10:in `level1'"]

f.resume

# It is nil after the fiber is finished
f.backtrace
#=> nil
static VALUE
rb_fiber_backtrace(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
backtrace_locations → 数组 点击切换来源
backtrace_locations(start) → 数组
backtrace_locations(start, count) → 数组
backtrace_locations(start..end) → 数组

类似于 backtrace,但将执行堆栈的每一行作为 Thread::Backtrace::Location 返回。接受与 backtrace 相同的参数。

f = Fiber.new { Fiber.yield }
f.resume
loc = f.backtrace_locations.first
loc.label  #=> "yield"
loc.path   #=> "test.rb"
loc.lineno #=> 1
static VALUE
rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber)
{
    return rb_vm_backtrace_locations(argc, argv, &fiber_ptr(fiber)->cont.saved_ec);
}
blocking? → true 或 false 点击切换来源

如果 fiber 正在阻塞,则返回 true,否则返回 false。如果通过将 blocking: false 传递给 Fiber.new 或通过 Fiber.schedule 创建 Fiber,则 Fiber 是非阻塞的。

请注意,即使该方法返回 false,fiber 仅在当前线程中设置 Fiber.scheduler 时才会表现得不同。

有关详细信息,请参阅类文档中的“非阻塞 fiber”部分。

VALUE
rb_fiber_blocking_p(VALUE fiber)
{
    return RBOOL(fiber_ptr(fiber)->blocking);
}
inspect()
别名:to_s
kill → nil 点击切换源代码

通过引发不可捕捉的异常终止协程。它只终止给定的协程,不终止其他协程,如果该协程正在调用 resumetransfer,则向其他协程返回 nil

Fiber#kill 仅在 Fiber.yield 中时中断其他协程。如果在当前协程上调用,则会在 Fiber#kill 调用站点引发该异常。

如果协程尚未启动,则直接过渡到终止状态。

如果协程已经终止,则不执行任何操作。

如果对属于另一个线程的协程调用,则引发 FiberError

static VALUE
rb_fiber_m_kill(VALUE self)
{
    rb_fiber_t *fiber = fiber_ptr(self);

    if (fiber->killed) return Qfalse;
    fiber->killed = 1;

    if (fiber->status == FIBER_CREATED) {
        fiber->status = FIBER_TERMINATED;
    }
    else if (fiber->status != FIBER_TERMINATED) {
        if (fiber_current() == fiber) {
            fiber_check_killed(fiber);
        } else {
            fiber_raise(fiber_ptr(self), Qnil);
        }
    }

    return self;
}
raise → obj 点击切换源代码
raise(string) → obj
raise(exception [, string [, array]]) → obj

在最后一次调用 Fiber.yield 的点上在协程中引发异常。如果协程尚未启动或已经运行到完成,则引发 FiberError。如果协程正在 yield,则恢复协程。如果协程正在传输,则将协程传输到其中。但如果协程正在恢复,则引发 FiberError

如果没有参数,则引发 RuntimeError。如果只有一个 String 参数,则引发 RuntimeError,并使用该字符串作为消息。否则,第一个参数应该是 Exception 类的名称(或在发送 exception 消息时返回 Exception 对象的对象)。可选的第二个参数设置与异常关联的消息,第三个参数是回调信息的数组。异常由 begin...end 块的 rescue 子句捕获。

如果对属于另一个 ThreadFiber 调用,则引发 FiberError

static VALUE
rb_fiber_m_raise(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_raise(self, argc, argv);
}
resume(args, ...) → obj 点击切换源代码

从最后一次调用 Fiber.yield 的点恢复协程,或者如果这是对 resume 的第一次调用,则开始运行协程。传递给 resume 的参数将是 Fiber.yield 表达式的值,或者如果这是第一次 resume,则将作为块参数传递给协程的块。

或者,当调用 resume 时,它将计算为传递给协程块内的下一个 Fiber.yield 语句的参数,或者如果协程在没有任何 Fiber.yield 的情况下运行到完成,则计算为块值。

static VALUE
rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber)
{
    return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p());
}
storage → hash (dup) 点击切换源代码

返回协程的存储哈希的副本。该方法只能在 Fiber.current 上调用。

static VALUE
rb_fiber_storage_get(VALUE self)
{
    storage_access_must_be_from_same_fiber(self);

    VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE);

    if (storage == Qnil) {
        return Qnil;
    }
    else {
        return rb_obj_dup(storage);
    }
}
storage = hash 点击切换源代码

设置 fiber 的存储哈希。此功能为实验性功能,未来可能会更改。此方法只能在 Fiber.current 上调用。

使用此方法时应小心,因为你可能会无意中清除重要的 fiber 存储状态。你应该更喜欢使用 Fiber::[]= 在存储中分配特定键。

你还可以使用 Fiber.new(storage: nil) 创建一个具有空存储的 fiber。

示例

while request = request_queue.pop
  # Reset the per-request state:
  Fiber.current.storage = nil
  handle_request(request)
end
static VALUE
rb_fiber_storage_set(VALUE self, VALUE value)
{
    if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) {
        rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL,
          "Fiber#storage= is experimental and may be removed in the future!");
    }

    storage_access_must_be_from_same_fiber(self);
    fiber_storage_validate(value);

    fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value);
    return value;
}
to_s() 点击切换源代码
static VALUE
fiber_to_s(VALUE fiber_value)
{
    const rb_fiber_t *fiber = fiber_ptr(fiber_value);
    const rb_proc_t *proc;
    char status_info[0x20];

    if (fiber->resuming_fiber) {
        snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status));
    }
    else {
        snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status));
    }

    if (!rb_obj_is_proc(fiber->first_proc)) {
        VALUE str = rb_any_to_s(fiber_value);
        strlcat(status_info, ">", sizeof(status_info));
        rb_str_set_len(str, RSTRING_LEN(str)-1);
        rb_str_cat_cstr(str, status_info);
        return str;
    }
    GetProcPtr(fiber->first_proc, proc);
    return rb_block_to_s(fiber_value, &proc->block, status_info);
}
别名:inspect
transfer(args, ...) → obj 点击切换源代码

将控制权转移到另一个 fiber,从它上次停止的位置恢复它,或者在它之前未恢复时启动它。调用 fiber 将被挂起,就像调用 Fiber.yield 一样。

接收 transfer 调用的 fiber 将其视为恢复调用。传递给 transfer 的参数将被视为传递给恢复的参数。

两种向 fiber 传递和从 fiber 传递控制的方式(一种是 resumeFiber::yield,另一种是 transfer 到和从 fiber)不能自由混合。

  • 如果 Fiber 的生命周期以 transfer 开始,它将永远无法 yield 或恢复控制权传递,只能完成或 transfer 回去。(它仍然可以恢复允许恢复的其他 fiber。)

  • 如果 Fiber 的生命周期以 resume 开始,它可以 yield 或 transfer 到另一个 Fiber,但只能以与它被放弃的方式兼容的方式接收控制权:如果它已 transfer,它只能被 transfer 回去,如果它已 yield,它只能被恢复回去。之后,它又可以 transfer 或 yield。

如果违反这些规则,将引发 FiberError

对于单独的 Fiber 设计,yield/resume 更易于使用(Fiber 只放弃控制权,它不需要考虑控制权交给了谁),而 transfer 对于复杂情况更灵活,允许构建相互依赖的 Fiber 任意图形。

示例

manager = nil # For local var to be visible inside worker block

# This fiber would be started with transfer
# It can't yield, and can't be resumed
worker = Fiber.new { |work|
  puts "Worker: starts"
  puts "Worker: Performed #{work.inspect}, transferring back"
  # Fiber.yield     # this would raise FiberError: attempt to yield on a not resumed fiber
  # manager.resume  # this would raise FiberError: attempt to resume a resumed fiber (double resume)
  manager.transfer(work.capitalize)
}

# This fiber would be started with resume
# It can yield or transfer, and can be transferred
# back or resumed
manager = Fiber.new {
  puts "Manager: starts"
  puts "Manager: transferring 'something' to worker"
  result = worker.transfer('something')
  puts "Manager: worker returned #{result.inspect}"
  # worker.resume    # this would raise FiberError: attempt to resume a transferring fiber
  Fiber.yield        # this is OK, the fiber transferred from and to, now it can yield
  puts "Manager: finished"
}

puts "Starting the manager"
manager.resume
puts "Resuming the manager"
# manager.transfer  # this would raise FiberError: attempt to transfer to a yielding fiber
manager.resume

产生

Starting the manager
Manager: starts
Manager: transferring 'something' to worker
Worker: starts
Worker: Performed "something", transferring back
Manager: worker returned "Something"
Resuming the manager
Manager: finished
static VALUE
rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self)
{
    return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p());
}