类 Rinda::RingServer
一个 RingServer
允许通过 UDP 广播定位 Rinda::TupleSpace
。默认的服务定位使用以下步骤
-
一个
RingServer
开始监听网络广播 UDP 地址。 -
一个
RingFinger
发送一个包含DRb
URI
的 UDP 数据包,它将在该地址监听回复。 -
该
RingServer
接收 UDP 数据包并使用DRb
服务连接回提供的DRb
URI
。
一个 RingServer
需要一个 TupleSpace
ts = Rinda::TupleSpace.new rs = Rinda::RingServer.new
RingServer
也可以监听多播地址以接收公告。这允许在同一主机上运行多个 RingServer。要使用网络广播和多播
ts = Rinda::TupleSpace.new rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]
公共类方法
在给定的 addresses
上的 port
上发布 ts
。
如果省略 addresses
,则只使用 UDP 广播地址。
addresses
可以包含多个地址。如果在 addresses
中给出了多播地址,则 RingServer
将监听多播查询。
如果您使用 IPv4 多播,您可能需要设置加入多播组的入站接口地址。
ts = Rinda::TupleSpace.new rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])
您可以将地址设置为 Array
Object
。该 Array
的第一个元素是多播地址,第二个是入站接口地址。如果省略第二个,则使用 '0.0.0.0'。
如果您使用 IPv6 多播,您可能需要设置本地接口地址和入站接口索引。
rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])
第一个元素是多播地址,第二个是入站接口地址。第三个是入站接口索引。
目前还没有简单的方法可以通过名称获取接口索引。
如果省略第二个参数,则使用 '::1'。如果省略第三个参数,则使用 0(默认接口)。
# File lib/rinda/ring.rb, line 94 def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) @port = port if Integer === addresses then addresses, @port = [Socket::INADDR_ANY], addresses end @renewer = Renewer.new @ts = ts @sockets = [] addresses.each do |address| if Array === address make_socket(*address) else make_socket(address) end end @w_services = write_services @r_service = reply_service end
公共实例方法
从 TupleSpace
中提取查找元组,并将它们的 DRb
对象发送到本地 TupleSpace
的地址。
# File lib/rinda/ring.rb, line 218 def do_reply tuple = @ts.take([:lookup_ring, nil], @renewer) Thread.new { tuple[1].call(@ts) rescue nil} rescue end
从 msg
中提取响应 URI
,并将其添加到 TupleSpace
中,以便 reply_service
可以获取它以进行通知。
# File lib/rinda/ring.rb, line 193 def do_write(msg) Thread.new do begin tuple, sec = Marshal.load(msg) @ts.write(tuple, sec) rescue end end end
在 address
上创建套接字。
如果 address
是多播地址,则可以设置 interface_address
和 multicast_interface
作为可选参数。
创建的套接字绑定到 interface_address
。如果您使用 IPv4 多播,则 interface_address
的接口将用作入站接口。如果省略或 interface_address
为 nil,则使用 '0.0.0.0' 或 '::1'。
如果您使用 IPv6 多播,则 multicast_interface
将用作入站接口。multicast_interface
是网络接口索引。如果省略 multicast_interface
,则使用 0(默认接口)。
# File lib/rinda/ring.rb, line 132 def make_socket(address, interface_address=nil, multicast_interface=0) addrinfo = Addrinfo.udp(address, @port) socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then if Socket.const_defined?(:SO_REUSEPORT) then socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) else socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) end if addrinfo.ipv4_multicast? then interface_address = '0.0.0.0' if interface_address.nil? socket.bind(Addrinfo.udp(interface_address, @port)) mreq = IPAddr.new(addrinfo.ip_address).hton + IPAddr.new(interface_address).hton socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) else interface_address = '::1' if interface_address.nil? socket.bind(Addrinfo.udp(interface_address, @port)) mreq = IPAddr.new(addrinfo.ip_address).hton + [multicast_interface].pack('I') socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) end else socket.bind(addrinfo) end socket rescue socket = socket.close if socket raise ensure @sockets << socket if socket end
创建一个线程,从 TupleSpace
中通知等待的客户端。
# File lib/rinda/ring.rb, line 206 def reply_service Thread.new do loop do do_reply end end end
关闭 RingServer
# File lib/rinda/ring.rb, line 227 def shutdown @renewer.renew = false @w_services.each do |thread| thread.kill thread.join end @sockets.each do |socket| socket.close end @r_service.kill @r_service.join end
创建线程,获取 UDP 数据包并将其传递给 do_write
进行解码。
# File lib/rinda/ring.rb, line 178 def write_services @sockets.map do |s| Thread.new(s) do |socket| loop do msg = socket.recv(1024) do_write(msg) end end end end