Fiber

Fiber 提供协作并发机制。

上下文切换

Fiber 执行用户提供的代码块。在执行期间,代码块可能会调用 Fiber.yieldFiber.transfer 切换到另一个 Fiber。Fiber#resume 用于从调用 Fiber.yield 的点继续执行。

#!/usr/bin/env ruby

puts "1: Start program."

f = Fiber.new do
  puts "3: Entered fiber."
  Fiber.yield
  puts "5: Resumed fiber."
end

puts "2: Resume fiber first time."
f.resume

puts "4: Resume fiber second time."
f.resume

puts "6: Finished."

此程序演示了 Fiber 的流控制。

调度程序

调度程序接口用于拦截阻塞操作。典型的实现将是类似于 EventMachineAsync 的 gem 的包装器。此设计在事件循环实现和应用程序代码之间提供了关注点分离。它还允许可以执行检测的分层调度程序。

为当前线程设置调度程序

Fiber.set_scheduler(MyScheduler.new)

当线程退出时,将隐式调用 set_scheduler

Fiber.set_scheduler(nil)

设计

调度程序接口被设计为用户代码和阻塞操作之间的一个不带观点的轻量级层。调度程序挂钩应避免转换或转换参数或返回值。理想情况下,用户代码中的完全相同的参数直接提供给调度程序挂钩,无需更改。

接口

这是您需要实现的接口。

class Scheduler
  # Wait for the specified process ID to exit.
  # This hook is optional.
  # @parameter pid [Integer] The process ID to wait for.
  # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
  # @returns [Process::Status] A process status instance.
  def process_wait(pid, flags)
    Thread.new do
      Process::Status.wait(pid, flags)
    end.value
  end

  # Wait for the given io readiness to match the specified events within
  # the specified timeout.
  # @parameter event [Integer] A bit mask of `IO::READABLE`,
  #   `IO::WRITABLE` and `IO::PRIORITY`.
  # @parameter timeout [Numeric] The amount of time to wait for the event in seconds.
  # @returns [Integer] The subset of events that are ready.
  def io_wait(io, events, timeout)
  end

  # Read from the given io into the specified buffer.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to read from.
  # @parameter buffer [IO::Buffer] The buffer to read into.
  # @parameter length [Integer] The minimum amount to read.
  def io_read(io, buffer, length)
  end

  # Write from the given buffer into the specified IO.
  # WARNING: Experimental hook! Do not use in production code!
  # @parameter io [IO] The io to write to.
  # @parameter buffer [IO::Buffer] The buffer to write from.
  # @parameter length [Integer] The minimum amount to write.
  def io_write(io, buffer, length)
  end

  # Sleep the current task for the specified duration, or forever if not
  # specified.
  # @parameter duration [Numeric] The amount of time to sleep in seconds.
  def kernel_sleep(duration = nil)
  end

  # Execute the given block. If the block execution exceeds the given timeout,
  # the specified exception `klass` will be raised. Typically, only non-blocking
  # methods which enter the scheduler will raise such exceptions.
  # @parameter duration [Integer] The amount of time to wait, after which an exception will be raised.
  # @parameter klass [Class] The exception class to raise.
  # @parameter *arguments [Array] The arguments to send to the constructor of the exception.
  # @yields {...} The user code to execute.
  def timeout_after(duration, klass, *arguments, &block)
  end

  # Resolve hostname to an array of IP addresses.
  # This hook is optional.
  # @parameter hostname [String] Example: "www.ruby-lang.org".
  # @returns [Array] An array of IPv4 and/or IPv6 address strings that the hostname resolves to.
  def address_resolve(hostname)
  end

  # Block the calling fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter timeout [Numeric | Nil] The amount of time to wait for in seconds.
  # @returns [Boolean] Whether the blocking operation was successful or not.
  def block(blocker, timeout = nil)
  end

  # Unblock the specified fiber.
  # @parameter blocker [Object] What we are waiting on, informational only.
  # @parameter fiber [Fiber] The fiber to unblock.
  # @reentrant Thread safe.
  def unblock(blocker, fiber)
  end

  # Intercept the creation of a non-blocking fiber.
  # @returns [Fiber]
  def fiber(&block)
    Fiber.new(blocking: false, &block)
  end

  # Invoked when the thread exits.
  def close
    self.run
  end

  def run
    # Implement event loop here.
  end
end

将来可能会引入其他挂钩,我们将使用特性检测来启用这些挂钩。

非阻塞执行

调度程序挂钩仅在特殊的非阻塞执行上下文中使用。非阻塞执行上下文引入不确定性,因为调度程序挂钩的执行可能会在您的程序中引入上下文切换点。

光纤

光纤可用于创建非阻塞执行上下文。

Fiber.new do
  puts Fiber.current.blocking? # false

  # May invoke `Fiber.scheduler&.io_wait`.
  io.read(...)

  # May invoke `Fiber.scheduler&.io_wait`.
  io.write(...)

  # Will invoke `Fiber.scheduler&.kernel_sleep`.
  sleep(n)
end.resume

我们还引入了一种新方法,它简化了这些非阻塞光纤的创建

Fiber.schedule do
  puts Fiber.current.blocking? # false
end

此方法的目的是允许调度程序在内部决定何时启动光纤以及是否使用对称或非对称光纤的策略。

您还可以创建阻塞执行上下文

Fiber.new(blocking: true) do
  # Won't use the scheduler:
  sleep(n)
end

但是,除非您正在实现调度程序,否则通常应避免这样做。

IO

默认情况下,I/O 为非阻塞。并非所有操作系统都支持非阻塞 I/O。Windows 是一个著名的示例,其中套接字 I/O 可以是非阻塞的,但管道 I/O 是阻塞的。只要调度程序并且当前线程是非阻塞的,操作就会调用调度程序。

互斥锁

Mutex 类可以在非阻塞上下文中使用,并且是光纤特定的。

条件变量

ConditionVariable 类可以在非阻塞上下文中使用,并且是光纤特定的。

队列 / 大小队列

QueueSizedQueue 类可以在非阻塞上下文中使用,并且是光纤特定的。

Thread

Thread#join 操作可以在非阻塞上下文中使用,并且是光纤特定的。