类 Thread

线程是 Ruby 实现并发编程模型的方式。

需要多线程执行的程序非常适合使用 Ruby 的 Thread 类。

例如,我们可以使用 ::new 创建一个与主线程执行分离的新线程。

thr = Thread.new { puts "What's the big deal" }

然后,我们可以使用 join 暂停主线程的执行,并允许新线程完成。

thr.join #=> "What's the big deal"

如果我们在主线程终止之前没有调用 thr.join,那么包括 thr 在内的所有其他线程都将被杀死。

或者,您可以使用数组来一次处理多个线程,如下例所示

threads = []
threads << Thread.new { puts "What's the big deal" }
threads << Thread.new { 3.times { puts "Threads are fun!" } }

创建几个线程后,我们依次等待它们全部完成。

threads.each { |thr| thr.join }

要检索线程的最后一个值,请使用 value

thr = Thread.new { sleep 1; "Useful value" }
thr.value #=> "Useful value"

Thread 初始化

为了创建新线程,Ruby 提供了 ::new::start::fork。必须为这些方法中的每一个提供一个代码块,否则将引发 ThreadError

当子类化 Thread 类时,::start::fork 将忽略子类的 initialize 方法。否则,请确保在您的 initialize 方法中调用 super。

Thread 终止

对于终止线程,Ruby 提供了多种方法。

类方法 ::kill 用于退出给定线程。

thr = Thread.new { sleep }
Thread.kill(thr) # sends exit() to thr

或者,您可以使用实例方法 exit,或其任何别名 killterminate

thr.exit

Thread 状态

Ruby 提供了一些实例方法来查询给定线程的状态。要获取包含当前线程状态的字符串,请使用 status

thr = Thread.new { sleep }
thr.status # => "sleep"
thr.exit
thr.status # => false

您还可以使用 alive? 来判断线程是否正在运行或休眠,以及 stop? 来判断线程是否已死或休眠。

Thread 变量和作用域

由于线程是使用块创建的,因此与其他 Ruby 块的变量作用域规则相同。在此块中创建的任何局部变量仅对该线程可见。

Fiber-local 与 Thread-local

每个 Fiber 都有自己的 Thread#[] 存储桶。当您设置一个新的 Fiber-local 时,它只能在该 Fiber 中访问。为了说明

Thread.new {
  Thread.current[:foo] = "bar"
  Fiber.new {
    p Thread.current[:foo] # => nil
  }.resume
}.join

此示例使用 [] 来获取和 []= 来设置 Fiber-local,您还可以使用 keys 来列出给定线程的 Fiber-local,以及 key? 来检查 Fiber-local 是否存在。

对于 Thread-local,它们可以在整个线程作用域内访问。以下面的示例为例

Thread.new{
  Thread.current.thread_variable_set(:foo, 1)
  p Thread.current.thread_variable_get(:foo) # => 1
  Fiber.new{
    Thread.current.thread_variable_set(:foo, 2)
    p Thread.current.thread_variable_get(:foo) # => 2
  }.resume
  p Thread.current.thread_variable_get(:foo)   # => 2
}.join

您可以看到,Thread-local :foo 被带入 Fiber 中,并在线程结束时被更改为 2

此示例使用 thread_variable_set 来创建新的 Thread-local,以及 thread_variable_get 来引用它们。

还有 thread_variables 用于列出所有 Thread-local,以及 thread_variable? 用于检查给定 Thread-local 是否存在。

Exception 处理

当线程内部出现未处理的异常时,线程将终止。默认情况下,此异常不会传播到其他线程。异常将被存储,当另一个线程调用 valuejoin 时,异常将在该线程中重新抛出。

t = Thread.new{ raise 'something went wrong' }
t.value #=> RuntimeError: something went wrong

可以使用 Thread#raise 实例方法从线程外部引发异常,该方法接受与 Kernel#raise 相同的参数。

设置 Thread.abort_on_exception = true,Thread#abort_on_exception = true 或 $DEBUG = true 将导致随后在线程中引发的未处理异常自动在主线程中重新抛出。

随着类方法 ::handle_interrupt 的添加,您现在可以使用线程异步处理异常。

调度

Ruby 提供了几种方法来支持在您的程序中调度线程。

第一种方法是使用类方法 ::stop,将当前运行的线程置于休眠状态并调度另一个线程的执行。

一旦线程进入休眠状态,您可以使用实例方法 wakeup 将您的线程标记为可调度状态。

您还可以尝试 ::pass,它尝试将执行传递给另一个线程,但这取决于操作系统,运行的线程是否会切换。对于 priority 也是如此,它允许您提示线程调度程序在传递执行时您希望哪些线程优先。此方法也取决于操作系统,并且可能在某些平台上被忽略。

公共类方法

abort_on_exception → true 或 false 点击以切换源代码

返回全局“异常中止”条件的状态。

默认值为 false

当设置为 true 时,如果任何线程因异常而中止,则引发的异常将在主线程中重新抛出。

也可以通过全局 $DEBUG 标志或命令行选项 -d 指定。

另请参阅 ::abort_on_exception=

还有一个实例级方法可以为特定线程设置此值,请参阅 abort_on_exception

static VALUE
rb_thread_s_abort_exc(VALUE _)
{
    return RBOOL(GET_THREAD()->vm->thread_abort_on_exception);
}
abort_on_exception= boolean → true 或 false 点击以切换源代码

当设置为 true 时,如果任何线程因异常而中止,则引发的异常将在主线程中重新抛出。返回新状态。

Thread.abort_on_exception = true
t1 = Thread.new do
  puts  "In new thread"
  raise "Exception from thread"
end
sleep(1)
puts "not reached"

这将产生

In new thread
prog.rb:4: Exception from thread (RuntimeError)
 from prog.rb:2:in `initialize'
 from prog.rb:2:in `new'
 from prog.rb:2

另请参阅 ::abort_on_exception

还有一个实例级方法可以为特定线程设置此属性,请参阅 abort_on_exception=

static VALUE
rb_thread_s_abort_exc_set(VALUE self, VALUE val)
{
    GET_THREAD()->vm->thread_abort_on_exception = RTEST(val);
    return val;
}
current → thread 点击以切换源代码

返回当前正在执行的线程。

Thread.current   #=> #<Thread:0x401bdf4c run>
static VALUE
thread_s_current(VALUE klass)
{
    return rb_thread_current();
}
each_caller_location{ |loc| ... } → nil 点击以切换源代码

将当前执行堆栈的每个帧作为回溯位置对象进行 yield。

static VALUE
each_caller_location(VALUE unused)
{
    rb_ec_partial_backtrace_object(GET_EC(), 2, ALL_BACKTRACE_LINES, NULL, FALSE, TRUE);
    return Qnil;
}
exit → thread 点击以切换源代码

终止当前运行的线程并安排另一个线程运行。

如果此线程已被标记为要终止,则 ::exit 返回 Thread

如果这是主线程或最后一个线程,则退出进程。

static VALUE
rb_thread_exit(VALUE _)
{
    rb_thread_t *th = GET_THREAD();
    return rb_thread_kill(th->self);
}
start([args]*) {|args| block } → thread 点击以切换源代码
fork([args]*) {|args| block } → thread

基本上与 ::new 相同。但是,如果类 Thread 被子类化,则在该子类中调用 start 不会调用子类的 initialize 方法。

static VALUE
thread_start(VALUE klass, VALUE args)
{
    struct thread_create_params params = {
        .type = thread_invoke_type_proc,
        .args = args,
        .proc = rb_block_proc(),
    };
    return thread_create_core(rb_thread_alloc(klass), &params);
}
handle_interrupt(hash) { ... } → result of the block 点击以切换源代码

更改异步中断计时。

中断 指的是异步事件和由 Thread#raiseThread#kill、信号陷阱(尚未支持)和主线程终止(如果主线程终止,则所有其他线程将被终止)执行的相应过程。

给定的 hash 包含类似 ExceptionClass => :TimingSymbol 的键值对。其中 ExceptionClass 是由给定块处理的中断。TimingSymbol 可以是以下符号之一

:immediate

立即调用中断。

:on_blocking

阻塞操作期间调用中断。

:never

从不调用所有中断。

阻塞操作 指的是将阻塞调用线程的操作,例如读写。在 CRuby 实现中,阻塞操作 是任何在没有 GVL 的情况下执行的操作。

屏蔽的异步中断将被延迟,直到它们被启用。此方法类似于 sigprocmask(3)。

注意

异步中断很难使用。

如果您需要在线程之间进行通信,请考虑使用其他方式,例如 Queue

或者在充分理解此方法的情况下使用它们。

用法

在这个例子中,我们可以防御 Thread#raise 异常。

使用 :never TimingSymbol,RuntimeError 异常将始终在主线程的第一个块中被忽略。在第二个 ::handle_interrupt 块中,我们可以有目的地处理 RuntimeError 异常。

th = Thread.new do
  Thread.handle_interrupt(RuntimeError => :never) {
    begin
      # You can write resource allocation code safely.
      Thread.handle_interrupt(RuntimeError => :immediate) {
        # ...
      }
    ensure
      # You can write resource deallocation code safely.
    end
  }
end
Thread.pass
# ...
th.raise "stop"

当我们忽略 RuntimeError 异常时,编写资源分配代码是安全的。然后,ensure 块是我们可以安全地释放资源的地方。

防御 Timeout::Error

在下一个例子中,我们将防御 Timeout::Error 异常。这将有助于防止在正常 ensure 子句中发生 Timeout::Error 异常时泄漏资源。在这个例子中,我们使用标准库 Timeout 的帮助,来自 lib/timeout.rb

require 'timeout'
Thread.handle_interrupt(Timeout::Error => :never) {
  timeout(10){
    # Timeout::Error doesn't occur here
    Thread.handle_interrupt(Timeout::Error => :on_blocking) {
      # possible to be killed by Timeout::Error
      # while blocking operation
    }
    # Timeout::Error doesn't occur here
  }
}

timeout 块的第一部分,我们可以依赖于 Timeout::Error 被忽略。然后在 Timeout::Error => :on_blocking 块中,任何会阻塞调用线程的操作都可能引发 Timeout::Error 异常。

堆栈控制设置

可以堆叠多个级别的 ::handle_interrupt 块,以便一次控制多个 ExceptionClass 和 TimingSymbol。

Thread.handle_interrupt(FooError => :never) {
  Thread.handle_interrupt(BarError => :never) {
     # FooError and BarError are prohibited.
  }
}

与 ExceptionClass 的继承

将考虑从 ExceptionClass 参数继承的所有异常。

Thread.handle_interrupt(Exception => :never) {
  # all exceptions inherited from Exception are prohibited.
}

为了处理所有中断,请使用 Object 而不是 Exception 作为 ExceptionClass,因为 kill/terminate 中断不会由 Exception 处理。

static VALUE
rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
{
    VALUE mask = Qundef;
    rb_execution_context_t * volatile ec = GET_EC();
    rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
    volatile VALUE r = Qnil;
    enum ruby_tag_type state;

    if (!rb_block_given_p()) {
        rb_raise(rb_eArgError, "block is needed.");
    }

    mask_arg = rb_to_hash_type(mask_arg);

    if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) {
        mask = Qnil;
    }

    rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);

    if (UNDEF_P(mask)) {
        return rb_yield(Qnil);
    }

    if (!RTEST(mask)) {
        mask = mask_arg;
    }
    else if (RB_TYPE_P(mask, T_HASH)) {
        OBJ_FREEZE_RAW(mask);
    }

    rb_ary_push(th->pending_interrupt_mask_stack, mask);
    if (!rb_threadptr_pending_interrupt_empty_p(th)) {
        th->pending_interrupt_queue_checked = 0;
        RUBY_VM_SET_INTERRUPT(th->ec);
    }

    EC_PUSH_TAG(th->ec);
    if ((state = EC_EXEC_TAG()) == TAG_NONE) {
        r = rb_yield(Qnil);
    }
    EC_POP_TAG();

    rb_ary_pop(th->pending_interrupt_mask_stack);
    if (!rb_threadptr_pending_interrupt_empty_p(th)) {
        th->pending_interrupt_queue_checked = 0;
        RUBY_VM_SET_INTERRUPT(th->ec);
    }

    RUBY_VM_CHECK_INTS(th->ec);

    if (state) {
        EC_JUMP_TAG(th->ec, state);
    }

    return r;
}
ignore_deadlock → true 或 false 点击切换源代码

返回全局“忽略死锁”条件的状态。默认值为 false,因此不会忽略死锁条件。

另请参见 ::ignore_deadlock=

static VALUE
rb_thread_s_ignore_deadlock(VALUE _)
{
    return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock);
}
ignore_deadlock = boolean → true 或 false 点击切换源代码

返回新状态。当设置为 true 时,VM 不会检查死锁条件。仅当您的应用程序可以通过其他方式(例如信号)打破死锁条件时,才有用。

Thread.ignore_deadlock = true
queue = Thread::Queue.new

trap(:SIGUSR1){queue.push "Received signal"}

# raises fatal error unless ignoring deadlock
puts queue.pop

另请参见 ::ignore_deadlock

static VALUE
rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val)
{
    GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val);
    return val;
}
kill(thread) → thread 点击切换源代码

导致给定的 thread 退出,另请参见 Thread::exit

count = 0
a = Thread.new { loop { count += 1 } }
sleep(0.1)       #=> 0
Thread.kill(a)   #=> #<Thread:0x401b3d30 dead>
count            #=> 93947
a.alive?         #=> false
static VALUE
rb_thread_s_kill(VALUE obj, VALUE th)
{
    return rb_thread_kill(th);
}
list → array 点击切换源代码

返回一个 Thread 对象数组,这些对象代表所有可运行或已停止的线程。

Thread.new { sleep(200) }
Thread.new { 1000000.times {|i| i*i } }
Thread.new { Thread.stop }
Thread.list.each {|t| p t}

这将产生

#<Thread:0x401b3e84 sleep>
#<Thread:0x401b3f38 run>
#<Thread:0x401b3fb0 sleep>
#<Thread:0x401bdf4c run>
static VALUE
thread_list(VALUE _)
{
    return rb_thread_list();
}
main → thread 点击切换源代码

返回主线程。

static VALUE
rb_thread_s_main(VALUE klass)
{
    return rb_thread_main();
}
new { ... } → thread 点击切换源代码
new(*args, &proc) → thread
new(*args) { |args| ... } → thread

创建一个执行给定代码块的新线程。

传递给 ::new 的任何 args 都将传递给代码块。

arr = []
a, b, c = 1, 2, 3
Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join
arr #=> [1, 2, 3]

如果在没有代码块的情况下调用 ::new,则会引发 ThreadError 异常。

如果您要子类化 Thread,请确保在您的 initialize 方法中调用 super,否则将引发 ThreadError

static VALUE
thread_s_new(int argc, VALUE *argv, VALUE klass)
{
    rb_thread_t *th;
    VALUE thread = rb_thread_alloc(klass);

    if (GET_RACTOR()->threads.main->status == THREAD_KILLED) {
        rb_raise(rb_eThreadError, "can't alloc thread");
    }

    rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
    th = rb_thread_ptr(thread);
    if (!threadptr_initialized(th)) {
        rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'",
                 klass);
    }
    return thread;
}
pass → nil 点击切换源代码

向线程调度器发出一个提示,以将执行传递给另一个线程。正在运行的线程可能会或可能不会切换,这取决于操作系统和处理器。

static VALUE
thread_s_pass(VALUE klass)
{
    rb_thread_schedule();
    return Qnil;
}
pending_interrupt?(error = nil) → true/false 点击切换源代码

返回异步队列是否为空。

由于 Thread::handle_interrupt 可用于延迟异步事件,因此此方法可用于确定是否存在任何延迟事件。

如果您发现此方法返回 true,则可以完成 :never 块。

例如,以下方法立即处理延迟的异步事件。

def Thread.kick_interrupt_immediately
  Thread.handle_interrupt(Object => :immediate) {
    Thread.pass
  }
end

如果提供了 error,则仅检查 error 类型的延迟事件。

用法

th = Thread.new{
  Thread.handle_interrupt(RuntimeError => :on_blocking){
    while true
      ...
      # reach safe point to invoke interrupt
      if Thread.pending_interrupt?
        Thread.handle_interrupt(Object => :immediate){}
      end
      ...
    end
  }
}
...
th.raise # stop thread

此示例也可以写成以下形式,您应该使用它来避免异步中断。

flag = true
th = Thread.new{
  Thread.handle_interrupt(RuntimeError => :on_blocking){
    while true
      ...
      # reach safe point to invoke interrupt
      break if flag == false
      ...
    end
  }
}
...
flag = false # stop thread
static VALUE
rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
{
    return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
}
report_on_exception → true or false 点击切换源代码

返回全局“异常报告”条件的状态。

从 Ruby 2.5 开始,默认值为 true

当此标志为 true 时创建的所有线程,如果异常终止线程,将在 $stderr 上报告一条消息。

Thread.new { 1.times { raise } }

将在 $stderr 上产生以下输出

#<Thread:...> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
        2: from -e:1:in `block in <main>'
        1: from -e:1:in `times'

这样做是为了尽早捕获线程中的错误。在某些情况下,您可能不希望出现此输出。有多种方法可以避免额外的输出

  • 如果异常并非预期,最好的方法是修复导致异常的原因,使其不再发生。

  • 如果异常是预期的,最好在异常抛出位置附近进行捕获,而不是让它终止 Thread

  • 如果保证 Thread 将使用 Thread#joinThread#value 进行连接,那么在启动 Thread 时,使用 Thread.current.report_on_exception = false 禁用此报告是安全的。但是,这可能会在很晚的时候处理异常,或者如果由于父线程被阻塞等原因而从未连接 Thread,则根本不会处理异常。

另请参阅 ::report_on_exception=

还有一种实例级别的方法可以为特定线程设置此值,请参见 report_on_exception=

static VALUE
rb_thread_s_report_exc(VALUE _)
{
    return RBOOL(GET_THREAD()->vm->thread_report_on_exception);
}
report_on_exception= boolean → true or false 点击切换源代码

返回新状态。当设置为 true 时,之后创建的所有线程都将继承该条件,并在异常终止线程时在 $stderr 上报告一条消息。

Thread.report_on_exception = true
t1 = Thread.new do
  puts  "In new thread"
  raise "Exception from thread"
end
sleep(1)
puts "In the main thread"

这将产生

In new thread
#<Thread:...prog.rb:2> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
prog.rb:4:in `block in <main>': Exception from thread (RuntimeError)
In the main thread

另请参见 ::report_on_exception

还有一种实例级别的方法可以为特定线程设置此值,请参见 report_on_exception=

static VALUE
rb_thread_s_report_exc_set(VALUE self, VALUE val)
{
    GET_THREAD()->vm->thread_report_on_exception = RTEST(val);
    return val;
}
start([args]*) {|args| block } → thread 点击以切换源代码
fork([args]*) {|args| block } → thread

基本上与 ::new 相同。但是,如果类 Thread 被子类化,则在该子类中调用 start 不会调用子类的 initialize 方法。

static VALUE
thread_start(VALUE klass, VALUE args)
{
    struct thread_create_params params = {
        .type = thread_invoke_type_proc,
        .args = args,
        .proc = rb_block_proc(),
    };
    return thread_create_core(rb_thread_alloc(klass), &params);
}
stop → nil 点击切换源代码

停止当前线程的执行,将其置于“休眠”状态,并安排另一个线程执行。

a = Thread.new { print "a"; Thread.stop; print "c" }
sleep 0.1 while a.status!='sleep'
print "b"
a.run
a.join
#=> "abc"
static VALUE
thread_stop(VALUE _)
{
    return rb_thread_stop();
}

公共实例方法

thr[sym] → obj or nil 点击切换源代码

属性引用 - 返回一个 fiber-local 变量的值(如果不在 Fiber 中,则为当前线程的根 fiber),使用符号或字符串名称。如果指定的变量不存在,则返回 nil

[
  Thread.new { Thread.current["name"] = "A" },
  Thread.new { Thread.current[:name]  = "B" },
  Thread.new { Thread.current["name"] = "C" }
].each do |th|
  th.join
  puts "#{th.inspect}: #{th[:name]}"
end

这将产生

#<Thread:0x00000002a54220 dead>: A
#<Thread:0x00000002a541a8 dead>: B
#<Thread:0x00000002a54130 dead>: C

Thread#[]Thread#[]= 不是线程本地,而是 fiber-local。在 Ruby 1.8 中不存在这种混淆,因为 fiber 仅在 Ruby 1.9 之后才可用。Ruby 1.9 选择让这些方法表现为 fiber-local 以节省以下用于动态作用域的习惯用法。

def meth(newvalue)
  begin
    oldvalue = Thread.current[:name]
    Thread.current[:name] = newvalue
    yield
  ensure
    Thread.current[:name] = oldvalue
  end
end

如果这些方法是线程本地,并且给定的代码块切换了 fiber,则该习惯用法可能无法作为动态作用域工作。

f = Fiber.new {
  meth(1) {
    Fiber.yield
  }
}
meth(2) {
  f.resume
}
f.resume
p Thread.current[:name]
#=> nil if fiber-local
#=> 2 if thread-local (The value 2 is leaked to outside of meth method.)

对于线程本地变量,请参见 thread_variable_getthread_variable_set

static VALUE
rb_thread_aref(VALUE thread, VALUE key)
{
    ID id = rb_check_id(&key);
    if (!id) return Qnil;
    return rb_thread_local_aref(thread, id);
}
thr[sym] = obj → obj 点击切换源代码

属性赋值 - 使用符号或字符串设置或创建 fiber-local 变量的值。

另请参见 Thread#[]

对于线程本地变量,请参见 thread_variable_setthread_variable_get

static VALUE
rb_thread_aset(VALUE self, VALUE id, VALUE val)
{
    return rb_thread_local_aset(self, rb_to_id(id), val);
}
abort_on_exception → true 或 false 点击以切换源代码

返回此 thr 的线程本地“异常时中止”条件的状态。

默认值为 false

另请参见 abort_on_exception=

还有一种类级别方法可以为所有线程设置此值,请参见 ::abort_on_exception

static VALUE
rb_thread_abort_exc(VALUE thread)
{
    return RBOOL(rb_thread_ptr(thread)->abort_on_exception);
}
abort_on_exception= boolean → true 或 false 点击以切换源代码

当设置为 true 时,如果此 thr 被异常中止,则在主线程中将重新引发所引发的异常。

另请参见 abort_on_exception

还有一个类级别方法可以为所有线程设置此值,请参见 ::abort_on_exception=

static VALUE
rb_thread_abort_exc_set(VALUE thread, VALUE val)
{
    rb_thread_ptr(thread)->abort_on_exception = RTEST(val);
    return val;
}
add_trace_func(proc) → proc 点击以切换源代码

proc添加为跟踪处理程序。

请参见 Thread#set_trace_funcKernel#set_trace_func

static VALUE
thread_add_trace_func_m(VALUE obj, VALUE trace)
{
    thread_add_trace_func(GET_EC(), rb_thread_ptr(obj), trace);
    return trace;
}
alive? → true 或 false 点击以切换源代码

如果thr正在运行或休眠,则返回true

thr = Thread.new { }
thr.join                #=> #<Thread:0x401b3fb0 dead>
Thread.current.alive?   #=> true
thr.alive?              #=> false

另请参见 stop?status

static VALUE
rb_thread_alive_p(VALUE thread)
{
    return RBOOL(!thread_finished(rb_thread_ptr(thread)));
}
backtrace → 数组或 nil 点击以切换源代码

返回目标线程的当前回溯。

static VALUE
rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval)
{
    return rb_vm_thread_backtrace(argc, argv, thval);
}
backtrace_locations(*args) → 数组或 nil 点击以切换源代码

返回目标线程的执行堆栈 - 包含回溯位置对象的数组。

有关更多信息,请参见 Thread::Backtrace::Location

此方法的行为类似于 Kernel#caller_locations,只是它适用于特定线程。

static VALUE
rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
{
    return rb_vm_thread_backtrace_locations(argc, argv, thval);
}
exit → thr

终止thr并安排另一个线程运行,返回已终止的 Thread。如果这是主线程或最后一个线程,则退出进程。

别名:kill
fetch(sym) → obj 点击以切换源代码
fetch(sym) { } → obj
fetch(sym, default) → obj

返回给定键的纤程局部变量。如果找不到键,则有几种选择:如果没有其他参数,它将引发 KeyError 异常;如果给出default,则将返回该值;如果指定了可选代码块,则将运行该代码块并返回其结果。请参见 Thread#[]Hash#fetch

static VALUE
rb_thread_fetch(int argc, VALUE *argv, VALUE self)
{
    VALUE key, val;
    ID id;
    rb_thread_t *target_th = rb_thread_ptr(self);
    int block_given;

    rb_check_arity(argc, 1, 2);
    key = argv[0];

    block_given = rb_block_given_p();
    if (block_given && argc == 2) {
        rb_warn("block supersedes default value argument");
    }

    id = rb_check_id(&key);

    if (id == recursive_key) {
        return target_th->ec->local_storage_recursive_hash;
    }
    else if (id && target_th->ec->local_storage &&
             rb_id_table_lookup(target_th->ec->local_storage, id, &val)) {
        return val;
    }
    else if (block_given) {
        return rb_yield(key);
    }
    else if (argc == 1) {
        rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
    }
    else {
        return argv[1];
    }
}
group → thgrp 或 nil 点击以切换源代码

返回包含给定线程的 ThreadGroup

Thread.main.group   #=> #<ThreadGroup:0x4029d914>
VALUE
rb_thread_group(VALUE thread)
{
    return rb_thread_ptr(thread)->thgroup;
}
inspect
别名:to_s
join → thr 点击以切换源代码
join(limit) → thr

调用线程将暂停执行并运行此thr

直到thr退出或给定的limit秒过去才会返回。

如果时间限制到期,将返回nil,否则返回thr

当主程序退出时,所有未加入的线程将被杀死。

如果thr之前已引发异常,并且::abort_on_exception或$DEBUG标志未设置(因此异常尚未处理),则此时将处理该异常。

a = Thread.new { print "a"; sleep(10); print "b"; print "c" }
x = Thread.new { print "x"; Thread.pass; print "y"; print "z" }
x.join # Let thread x finish, thread a will be killed on exit.
#=> "axyz"

以下示例说明了limit参数。

y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }}
puts "Waiting" until y.join(0.15)

这将产生

tick...
Waiting
tick...
Waiting
tick...
tick...
static VALUE
thread_join_m(int argc, VALUE *argv, VALUE self)
{
    VALUE timeout = Qnil;
    rb_hrtime_t rel = 0, *limit = 0;

    if (rb_check_arity(argc, 0, 1)) {
        timeout = argv[0];
    }

    // Convert the timeout eagerly, so it's always converted and deterministic
    /*
     * This supports INFINITY and negative values, so we can't use
     * rb_time_interval right now...
     */
    if (NIL_P(timeout)) {
        /* unlimited */
    }
    else if (FIXNUM_P(timeout)) {
        rel = rb_sec2hrtime(NUM2TIMET(timeout));
        limit = &rel;
    }
    else {
        limit = double2hrtime(&rel, rb_num2dbl(timeout));
    }

    return thread_join(rb_thread_ptr(self), timeout, limit);
}
key?(sym) → true or false click to toggle source

如果给定的字符串(或符号)作为纤维局部变量存在,则返回true

me = Thread.current
me[:oliver] = "a"
me.key?(:oliver)    #=> true
me.key?(:stanley)   #=> false
static VALUE
rb_thread_key_p(VALUE self, VALUE key)
{
    VALUE val;
    ID id = rb_check_id(&key);
    struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;

    if (!id || local_storage == NULL) {
        return Qfalse;
    }
    return RBOOL(rb_id_table_lookup(local_storage, id, &val));
}
keys → array click to toggle source

返回一个包含纤维局部变量名称(作为符号)的数组。

thr = Thread.new do
  Thread.current[:cat] = 'meow'
  Thread.current["dog"] = 'woof'
end
thr.join   #=> #<Thread:0x401b3f10 dead>
thr.keys   #=> [:dog, :cat]
static VALUE
rb_thread_keys(VALUE self)
{
    struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
    VALUE ary = rb_ary_new();

    if (local_storage) {
        rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary);
    }
    return ary;
}
kill → thr click to toggle source

终止thr并安排另一个线程运行,返回已终止的 Thread。如果这是主线程或最后一个线程,则退出进程。

VALUE
rb_thread_kill(VALUE thread)
{
    rb_thread_t *target_th = rb_thread_ptr(thread);

    if (target_th->to_kill || target_th->status == THREAD_KILLED) {
        return thread;
    }
    if (target_th == target_th->vm->ractor.main_thread) {
        rb_exit(EXIT_SUCCESS);
    }

    RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(target_th));

    if (target_th == GET_THREAD()) {
        /* kill myself immediately */
        rb_threadptr_to_kill(target_th);
    }
    else {
        threadptr_check_pending_interrupt_queue(target_th);
        rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED);
        rb_threadptr_interrupt(target_th);
    }

    return thread;
}
也称为:terminateexit
name → string click to toggle source

显示线程的名称。

static VALUE
rb_thread_getname(VALUE thread)
{
    return rb_thread_ptr(thread)->name;
}
name=(name) → string click to toggle source

将给定名称设置为 Ruby 线程。在某些平台上,它可能会将名称设置为 pthread 和/或内核。

static VALUE
rb_thread_setname(VALUE thread, VALUE name)
{
    rb_thread_t *target_th = rb_thread_ptr(thread);

    if (!NIL_P(name)) {
        rb_encoding *enc;
        StringValueCStr(name);
        enc = rb_enc_get(name);
        if (!rb_enc_asciicompat(enc)) {
            rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
                     rb_enc_name(enc));
        }
        name = rb_str_new_frozen(name);
    }
    target_th->name = name;
    if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
        native_set_another_thread_name(target_th->nt->thread_id, name);
    }
    return name;
}
native_thread_id → integer click to toggle source

返回 Ruby 线程使用的本机线程 ID。

ID 依赖于操作系统。(不是 pthread_self(3) 返回的 POSIX 线程 ID)

  • 在 Linux 上,它是 gettid(2) 返回的 TID。

  • 在 macOS 上,它是 pthread_threadid_np(3) 返回的线程的系统范围唯一的整数 ID。

  • 在 FreeBSD 上,它是 pthread_getthreadid_np(3) 返回的线程的唯一整数 ID。

  • 在 Windows 上,它是 GetThreadId() 返回的线程标识符。

  • 在其他平台上,它会引发NotImplementedError

注意:如果线程尚未关联或已与本机线程解除关联,则它将返回nil。如果 Ruby 实现使用 M:N 线程模型,则 ID 可能会根据时间而改变。

static VALUE
rb_thread_native_thread_id(VALUE thread)
{
    rb_thread_t *target_th = rb_thread_ptr(thread);
    if (rb_threadptr_dead(target_th)) return Qnil;
    return native_thread_native_thread_id(target_th);
}
pending_interrupt?(error = nil) → true/false 点击切换源代码

返回目标线程的异步队列是否为空。

如果提供了 error,则仅检查 error 类型的延迟事件。

有关更多信息,请参见 ::pending_interrupt?

static VALUE
rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
{
    rb_thread_t *target_th = rb_thread_ptr(target_thread);

    if (!target_th->pending_interrupt_queue) {
        return Qfalse;
    }
    if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
        return Qfalse;
    }
    if (rb_check_arity(argc, 0, 1)) {
        VALUE err = argv[0];
        if (!rb_obj_is_kind_of(err, rb_cModule)) {
            rb_raise(rb_eTypeError, "class or module required for rescue clause");
        }
        return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err));
    }
    else {
        return Qtrue;
    }
}
priority → integer 点击以切换源代码

返回thr的优先级。默认情况下,继承自创建新线程的当前线程,或者初始主线程为零;高优先级线程比低优先级线程运行更频繁(但低优先级线程也可以运行)。

这只是 Ruby 线程调度程序的提示。它可能在某些平台上被忽略。

Thread.current.priority   #=> 0
static VALUE
rb_thread_priority(VALUE thread)
{
    return INT2NUM(rb_thread_ptr(thread)->priority);
}
priority= integer → thr 点击以切换源代码

thr的优先级设置为integer。高优先级线程比低优先级线程运行更频繁(但低优先级线程也可以运行)。

这只是 Ruby 线程调度程序的提示。它可能在某些平台上被忽略。

count1 = count2 = 0
a = Thread.new do
      loop { count1 += 1 }
    end
a.priority = -1

b = Thread.new do
      loop { count2 += 1 }
    end
b.priority = -2
sleep 1   #=> 1
count1    #=> 622504
count2    #=> 5832
static VALUE
rb_thread_priority_set(VALUE thread, VALUE prio)
{
    rb_thread_t *target_th = rb_thread_ptr(thread);
    int priority;

#if USE_NATIVE_THREAD_PRIORITY
    target_th->priority = NUM2INT(prio);
    native_thread_apply_priority(th);
#else
    priority = NUM2INT(prio);
    if (priority > RUBY_THREAD_PRIORITY_MAX) {
        priority = RUBY_THREAD_PRIORITY_MAX;
    }
    else if (priority < RUBY_THREAD_PRIORITY_MIN) {
        priority = RUBY_THREAD_PRIORITY_MIN;
    }
    target_th->priority = (int8_t)priority;
#endif
    return INT2NUM(target_th->priority);
}
raise 点击以切换源代码
raise(string)
raise(exception [, string [, array]])

从给定线程引发异常。调用者不必是thr。有关更多信息,请参见 Kernel#raise

Thread.abort_on_exception = true
a = Thread.new { sleep(200) }
a.raise("Gotcha")

这将产生

prog.rb:3: Gotcha (RuntimeError)
 from prog.rb:2:in `initialize'
 from prog.rb:2:in `new'
 from prog.rb:2
static VALUE
thread_raise_m(int argc, VALUE *argv, VALUE self)
{
    rb_thread_t *target_th = rb_thread_ptr(self);
    const rb_thread_t *current_th = GET_THREAD();

    threadptr_check_pending_interrupt_queue(target_th);
    rb_threadptr_raise(target_th, argc, argv);

    /* To perform Thread.current.raise as Kernel.raise */
    if (current_th == target_th) {
        RUBY_VM_CHECK_INTS(target_th->ec);
    }
    return Qnil;
}
report_on_exception → true or false 点击切换源代码

返回此thr的线程本地“异常报告”条件的状态。

创建 Thread 时的默认值为全局标志 Thread.report_on_exception 的值。

另请参见 report_on_exception=

还有一个类级方法可以为所有新线程设置此值,请参见 ::report_on_exception=

static VALUE
rb_thread_report_exc(VALUE thread)
{
    return RBOOL(rb_thread_ptr(thread)->report_on_exception);
}
report_on_exception= boolean → true or false 点击切换源代码

当设置为true时,如果异常终止此thr,则会在 $stderr 上打印一条消息。有关详细信息,请参见 ::report_on_exception

另请参见 report_on_exception

还有一个类级方法可以为所有新线程设置此值,请参见 ::report_on_exception=

static VALUE
rb_thread_report_exc_set(VALUE thread, VALUE val)
{
    rb_thread_ptr(thread)->report_on_exception = RTEST(val);
    return val;
}
run → thr 点击以切换源代码

唤醒thr,使其有资格进行调度。

a = Thread.new { puts "a"; Thread.stop; puts "c" }
sleep 0.1 while a.status!='sleep'
puts "Got here"
a.run
a.join

这将产生

a
Got here
c

另请参见实例方法 wakeup

VALUE
rb_thread_run(VALUE thread)
{
    rb_thread_wakeup(thread);
    rb_thread_schedule();
    return thread;
}
set_trace_func(proc) → proc 点击以切换源代码
set_trace_func(nil) → nil

thr上建立proc作为跟踪的处理程序,或者如果参数为nil,则禁用跟踪。

参见 Kernel#set_trace_func

static VALUE
thread_set_trace_func_m(VALUE target_thread, VALUE trace)
{
    rb_execution_context_t *ec = GET_EC();
    rb_thread_t *target_th = rb_thread_ptr(target_thread);

    rb_threadptr_remove_event_hook(ec, target_th, call_trace_func, Qundef);

    if (NIL_P(trace)) {
        return Qnil;
    }
    else {
        thread_add_trace_func(ec, target_th, trace);
        return trace;
    }
}
status → string, false or nil 点击以切换源代码

返回thr的状态。

"sleep"

如果此线程正在休眠或等待 I/O,则返回。

"运行"

当此线程正在执行时

"中止"

如果此线程正在中止

false

当此线程正常终止时

nil

如果以异常终止。

a = Thread.new { raise("die now") }
b = Thread.new { Thread.stop }
c = Thread.new { Thread.exit }
d = Thread.new { sleep }
d.kill                  #=> #<Thread:0x401b3678 aborting>
a.status                #=> nil
b.status                #=> "sleep"
c.status                #=> false
d.status                #=> "aborting"
Thread.current.status   #=> "run"

另请参阅实例方法 alive?stop?

static VALUE
rb_thread_status(VALUE thread)
{
    rb_thread_t *target_th = rb_thread_ptr(thread);

    if (rb_threadptr_dead(target_th)) {
        if (!NIL_P(target_th->ec->errinfo) &&
            !FIXNUM_P(target_th->ec->errinfo)) {
            return Qnil;
        }
        else {
            return Qfalse;
        }
    }
    else {
        return rb_str_new2(thread_status_name(target_th, FALSE));
    }
}
stop? → true 或 false 点击切换源代码

如果 thr 已死或处于休眠状态,则返回 true

a = Thread.new { Thread.stop }
b = Thread.current
a.stop?   #=> true
b.stop?   #=> false

另请参阅 alive?status.

static VALUE
rb_thread_stop_p(VALUE thread)
{
    rb_thread_t *th = rb_thread_ptr(thread);

    if (rb_threadptr_dead(th)) {
        return Qtrue;
    }
    return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER);
}
terminate → thr

终止thr并安排另一个线程运行,返回已终止的 Thread。如果这是主线程或最后一个线程,则退出进程。

别名:kill
thread_variable?(key) → true 或 false 点击切换源代码

如果给定的字符串(或符号)作为线程局部变量存在,则返回 true

me = Thread.current
me.thread_variable_set(:oliver, "a")
me.thread_variable?(:oliver)    #=> true
me.thread_variable?(:stanley)   #=> false

请注意,这些不是纤程局部变量。有关更多详细信息,请参阅 Thread#[]Thread#thread_variable_get

static VALUE
rb_thread_variable_p(VALUE thread, VALUE key)
{
    VALUE locals;

    if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
        return Qfalse;
    }
    locals = rb_thread_local_storage(thread);

    return RBOOL(rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil);
}
thread_variable_get(key) → obj 或 nil 点击切换源代码

返回已设置的线程局部变量的值。请注意,这些与纤程局部值不同。对于纤程局部值,请参阅 Thread#[]Thread#[]=

Thread 局部值随线程一起传递,并且不尊重纤程。例如

Thread.new {
  Thread.current.thread_variable_set("foo", "bar") # set a thread local
  Thread.current["foo"] = "bar"                    # set a fiber local

  Fiber.new {
    Fiber.yield [
      Thread.current.thread_variable_get("foo"), # get the thread local
      Thread.current["foo"],                     # get the fiber local
    ]
  }.resume
}.join.value # => ['bar', nil]

线程局部返回的值为“bar”,而纤程局部返回的值为 nil。纤程在同一个线程中执行,因此线程局部值可用。

static VALUE
rb_thread_variable_get(VALUE thread, VALUE key)
{
    VALUE locals;

    if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
        return Qnil;
    }
    locals = rb_thread_local_storage(thread);
    return rb_hash_aref(locals, rb_to_symbol(key));
}
thread_variable_set(key, value) 点击切换源代码

key 的线程局部设置为 value。请注意,这些是线程局部的,而不是纤程局部的。有关更多信息,请参阅 Thread#thread_variable_getThread#[]

static VALUE
rb_thread_variable_set(VALUE thread, VALUE key, VALUE val)
{
    VALUE locals;

    if (OBJ_FROZEN(thread)) {
        rb_frozen_error_raise(thread, "can't modify frozen thread locals");
    }

    locals = rb_thread_local_storage(thread);
    return rb_hash_aset(locals, rb_to_symbol(key), val);
}
thread_variables → array 点击切换源代码

返回线程局部变量名称的数组(作为符号)。

thr = Thread.new do
  Thread.current.thread_variable_set(:cat, 'meow')
  Thread.current.thread_variable_set("dog", 'woof')
end
thr.join               #=> #<Thread:0x401b3f10 dead>
thr.thread_variables   #=> [:dog, :cat]

请注意,这些不是纤程局部变量。有关更多详细信息,请参阅 Thread#[]Thread#thread_variable_get

static VALUE
rb_thread_variables(VALUE thread)
{
    VALUE locals;
    VALUE ary;

    ary = rb_ary_new();
    if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
        return ary;
    }
    locals = rb_thread_local_storage(thread);
    rb_hash_foreach(locals, keys_i, ary);

    return ary;
}
to_s → string 点击切换源代码

thr的名称、ID和状态转储到字符串中。

static VALUE
rb_thread_to_s(VALUE thread)
{
    VALUE cname = rb_class_path(rb_obj_class(thread));
    rb_thread_t *target_th = rb_thread_ptr(thread);
    const char *status;
    VALUE str, loc;

    status = thread_status_name(target_th, TRUE);
    str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread);
    if (!NIL_P(target_th->name)) {
        rb_str_catf(str, "@%"PRIsVALUE, target_th->name);
    }
    if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) {
        rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE,
                    RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
    }
    rb_str_catf(str, " %s>", status);

    return str;
}
别名:inspect
value → obj 点击切换源代码

使用join等待thr完成,并返回其值或引发终止线程的异常。

a = Thread.new { 2 + 2 }
a.value   #=> 4

b = Thread.new { raise 'something went wrong' }
b.value   #=> RuntimeError: something went wrong
static VALUE
thread_value(VALUE self)
{
    rb_thread_t *th = rb_thread_ptr(self);
    thread_join(th, Qnil, 0);
    if (UNDEF_P(th->value)) {
        // If the thread is dead because we forked th->value is still Qundef.
        return Qnil;
    }
    return th->value;
}
wakeup → thr 点击切换源代码

将给定线程标记为可调度,但它可能仍然被I/O阻塞。

注意:这不会调用调度程序,有关更多信息,请参见run

c = Thread.new { Thread.stop; puts "hey!" }
sleep 0.1 while c.status!='sleep'
c.wakeup
c.join
#=> "hey!"
VALUE
rb_thread_wakeup(VALUE thread)
{
    if (!RTEST(rb_thread_wakeup_alive(thread))) {
        rb_raise(rb_eThreadError, "killed thread");
    }
    return thread;
}