类 Thread::Queue

Thread::Queue 实现多生产者、多消费者的队列。它在多线程编程中特别有用,当需要在多个线程之间安全地交换信息时。类 Thread::Queue 实现所有必要的锁定语义。

该类实现 FIFO(先进先出)类型的队列。在 FIFO 队列中,添加的第一个任务是第一个被检索到的任务。

示例

queue = Thread::Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

公共类方法

Thread::Queue.new → empty_queue 点击切换源代码
Thread::Queue.new(enumerable) → queue

创建一个新的队列实例,可以选择使用 enumerable 的内容作为其初始状态。

示例

q = Thread::Queue.new
#=> #<Thread::Queue:0x00007ff7501110d0>
q.empty?
#=> true

q = Thread::Queue.new([1, 2, 3])
#=> #<Thread::Queue:0x00007ff7500ec500>
q.empty?
#=> false
q.pop
#=> 1
static VALUE
rb_queue_initialize(int argc, VALUE *argv, VALUE self)
{
    VALUE initial;
    struct rb_queue *q = queue_ptr(self);
    if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
        initial = rb_to_array(initial);
    }
    RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
    ccan_list_head_init(queue_waitq(q));
    if (argc == 1) {
        rb_ary_concat(q->que, initial);
    }
    return self;
}

公共实例方法

<<(object)

将给定的 object 推入队列。

别名:push
clear() 点击切换源代码

从队列中移除所有对象。

static VALUE
rb_queue_clear(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    rb_ary_clear(check_array(self, q->que));
    return self;
}
close 点击切换源代码

关闭队列。关闭的队列无法重新打开。

在调用 close 完成后,以下为真

  • closed? 将返回 true

  • close 将被忽略。

  • 调用 enq/push/<< 将引发 ClosedQueueError

  • empty? 为 false 时,调用 deq/pop/shift 将像往常一样从队列中返回一个对象。

  • empty? 为 true 时,deq(false) 将不会挂起线程,并将返回 nil。deq(true) 将引发 ThreadError

ClosedQueueError 继承自 StopIteration,因此您可以中断循环块。

示例

q = Thread::Queue.new
Thread.new{
  while e = q.deq # wait for nil to break loop
    # ...
  end
}
q.close
static VALUE
rb_queue_close(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    if (!queue_closed_p(self)) {
        FL_SET(self, QUEUE_CLOSED);

        wakeup_all(queue_waitq(q));
    }

    return self;
}
closed? 点击切换源代码

如果队列已关闭,则返回 true

static VALUE
rb_queue_closed_p(VALUE self)
{
    return RBOOL(queue_closed_p(self));
}
deq
别名:pop
empty? 点击切换源代码

如果队列为空,则返回 true

static VALUE
rb_queue_empty_p(VALUE self)
{
    return RBOOL(queue_length(self, queue_ptr(self)) == 0);
}
enq(object)

将给定的 object 推入队列。

别名:push
freeze 点击切换源代码

队列不能被冻结,因此此方法会引发异常。

Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
static VALUE
rb_queue_freeze(VALUE self)
{
    rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
    UNREACHABLE_RETURN(self);
}
length 点击切换源代码
size

返回队列的长度。

static VALUE
rb_queue_length(VALUE self)
{
    return LONG2NUM(queue_length(self, queue_ptr(self)));
}
也称为:size
num_waiting() 点击切换源代码

返回正在等待队列的线程数。

static VALUE
rb_queue_num_waiting(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    return INT2NUM(q->num_waiting);
}
pop(non_block=false, timeout: nil) 点击切换源代码

从队列中检索数据。

如果队列为空,则调用线程将被挂起,直到数据被推送到队列中。如果 non_block 为 true,则线程不会被挂起,并且会引发 ThreadError

如果 timeout 秒过去并且没有可用数据,则返回 nil。如果 timeout0,则立即返回。

# File thread_sync.rb, line 14
def pop(non_block = false, timeout: nil)
  if non_block && timeout
    raise ArgumentError, "can't set a timeout if non_block is enabled"
  end
  Primitive.rb_queue_pop(non_block, timeout)
end
也称为:deqshift
push(object) 点击切换源代码

将给定的 object 推入队列。

static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
    return queue_do_push(self, queue_ptr(self), obj);
}
也称为:enq<<
shift
别名:pop
length
size

返回队列的长度。

别名:length