Class: Opal::BuilderScheduler::Prefork::ForkSet

Inherits:
Array
  • Object
show all
Defined in:
opal/lib/opal/builder_scheduler/prefork.rb

Instance Method Summary collapse

Constructor Details

#initialize(count, &block) ⇒ ForkSet

Returns a new instance of ForkSet.

[View source]

76
77
78
79
80
81
82
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 76

def initialize(count, &block)
  super([])

  @count, @block = count, block

  create_fork
end

Instance Method Details

#closeObject

[View source]

123
124
125
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 123

def close
  each(&:close)
end

#create_forkObject

[View source]

115
116
117
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 115

def create_fork
  self << Fork.new(self, &@block)
end

#from_io(io, type) ⇒ Object

[View source]

119
120
121
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 119

def from_io(io, type)
  find { |i| i.__send__(type) == io }
end

#get_events(queue_length) ⇒ Object

[View source]

84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 84

def get_events(queue_length)
  # Wait for anything to happen:
  # - Either any of our workers return some data
  # - Or any workers become ready to receive data
  #   - But only if we have enough work for them
  ios = IO.select(
    map(&:read_io),
    sample(queue_length).map(&:write_io),
    []
  )
  return [[], []] unless ios

  events = ios[0].map do |io|
    io = from_io(io, :read_io)
    [io, *io.recv]
  end

  idles = ios[1].map do |io|
    from_io(io, :write_io)
  end

  # Progressively create forks, because we may not need all
  # the workers at the time. The number 6 was picked due to
  # some trial and error on a Ryzen machine.
  #
  # Do note that prefork may happen more than once.
  create_fork if length < @count && rand(6) == 1

  [events, idles]
end

#waitObject

[View source]

127
128
129
# File 'opal/lib/opal/builder_scheduler/prefork.rb', line 127

def wait
  each(&:wait)
end