类 Rinda::TupleSpace

元组空间管理对其中包含的元组的访问,确保满足互斥要求。

对于 write、take、move、read 和 notify 方法的 sec 选项,可以是秒数或 Renewer 对象。

公共类方法

new(period=60) 点击切换源代码

创建一个新的 TupleSpaceperiod 用于控制在对 TupleSpace 进行修改后多久检查一次死元组。

如果在上次修改后的 period 秒内没有找到死元组,TupleSpace 将停止查找死元组。

调用超类方法 MonitorMixin::new
# File lib/rinda/tuplespace.rb, line 437
def initialize(period=60)
  super()
  @bag = TupleBag.new
  @read_waiter = TupleBag.new
  @take_waiter = TupleBag.new
  @notify_waiter = TupleBag.new
  @period = period
  @keeper = nil
end

公共实例方法

move(port, tuple, sec=nil) { |template| ... } 点击切换源代码

tuple 移动到 port

# File lib/rinda/tuplespace.rb, line 484
def move(port, tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    if entry
      port.push(entry.value) if port
      @bag.delete(entry)
      notify_event('take', entry.value)
      return port ? nil : entry.value
    end
    raise RequestExpiredError if template.expired?

    begin
      @take_waiter.push(template)
      start_keeper if template.expires
      while true
        raise RequestCanceledError if template.canceled?
        raise RequestExpiredError if template.expired?
        entry = @bag.find(template)
        if entry
          port.push(entry.value) if port
          @bag.delete(entry)
          notify_event('take', entry.value)
          return port ? nil : entry.value
        end
        template.wait
      end
    ensure
      @take_waiter.delete(template)
    end
  end
end
notify(event, tuple, sec=nil) 点击切换源代码

注册 event 的通知。返回一个 NotifyTemplateEntry。有关如何监听通知的示例,请参见 NotifyTemplateEntry

event 可以是

‘write’

添加了一个元组

‘take’

元组被获取或移动

‘delete’

元组在被覆盖或过期后丢失

NotifyTemplateEntry 过期时,TupleSpace 也会通知您 ‘close’ 事件。

# File lib/rinda/tuplespace.rb, line 567
def notify(event, tuple, sec=nil)
  template = NotifyTemplateEntry.new(self, event, tuple, sec)
  synchronize do
    @notify_waiter.push(template)
  end
  template
end
read(tuple, sec=nil) { |template| ... } 点击切换源代码

读取 tuple,但不删除它。

# File lib/rinda/tuplespace.rb, line 521
def read(tuple, sec=nil)
  template = WaitTemplateEntry.new(self, tuple, sec)
  yield(template) if block_given?
  synchronize do
    entry = @bag.find(template)
    return entry.value if entry
    raise RequestExpiredError if template.expired?

    begin
      @read_waiter.push(template)
      start_keeper if template.expires
      template.wait
      raise RequestCanceledError if template.canceled?
      raise RequestExpiredError if template.expired?
      return template.found
    ensure
      @read_waiter.delete(template)
    end
  end
end
read_all(tuple) 点击切换源代码

返回所有与 tuple 匹配的元组。不删除找到的元组。

# File lib/rinda/tuplespace.rb, line 545
def read_all(tuple)
  template = WaitTemplateEntry.new(self, tuple, nil)
  synchronize do
    entry = @bag.find_all(template)
    entry.collect do |e|
      e.value
    end
  end
end
take(tuple, sec=nil, &block) 点击切换源代码

移除 tuple

# File lib/rinda/tuplespace.rb, line 477
def take(tuple, sec=nil, &block)
  move(nil, tuple, sec, &block)
end
write(tuple, sec=nil) 点击切换源代码

添加 tuple

# File lib/rinda/tuplespace.rb, line 450
def write(tuple, sec=nil)
  entry = create_entry(tuple, sec)
  synchronize do
    if entry.expired?
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      notify_event('write', entry.value)
      notify_event('delete', entry.value)
    else
      @bag.push(entry)
      start_keeper if entry.expires
      @read_waiter.find_all_template(entry).each do |template|
        template.read(tuple)
      end
      @take_waiter.find_all_template(entry).each do |template|
        template.signal
      end
      notify_event('write', entry.value)
    end
  end
  entry
end

私有实例方法

create_entry(tuple, sec) 点击切换源代码
# File lib/rinda/tuplespace.rb, line 577
def create_entry(tuple, sec)
  TupleEntry.new(tuple, sec)
end
keep_clean() 点击切换源代码

移除失效的元组。

# File lib/rinda/tuplespace.rb, line 584
def keep_clean
  synchronize do
    @read_waiter.delete_unless_alive.each do |e|
      e.signal
    end
    @take_waiter.delete_unless_alive.each do |e|
      e.signal
    end
    @notify_waiter.delete_unless_alive.each do |e|
      e.notify(['close'])
    end
    @bag.delete_unless_alive.each do |e|
      notify_event('delete', e.value)
    end
  end
end
need_keeper?() 点击切换源代码

检查元组空间是否需要清理。

# File lib/rinda/tuplespace.rb, line 631
def need_keeper?
  return true if @bag.has_expires?
  return true if @read_waiter.has_expires?
  return true if @take_waiter.has_expires?
  return true if @notify_waiter.has_expires?
end
notify_event(event, tuple) 点击切换源代码

通知所有为 event 注册的监听器,tuple 的状态发生了变化。

# File lib/rinda/tuplespace.rb, line 605
def notify_event(event, tuple)
  ev = [event, tuple]
  @notify_waiter.find_all_template(ev).each do |template|
    template.notify(ev)
  end
end
start_keeper() 点击切换源代码

创建一个线程来扫描元组空间以查找过期的元组。

# File lib/rinda/tuplespace.rb, line 615
def start_keeper
  return if @keeper && @keeper.alive?
  @keeper = Thread.new do
    while true
      sleep(@period)
      synchronize do
        break unless need_keeper?
        keep_clean
      end
    end
  end
end