Skip to content
Merged

Monitor #2991

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/natalie/thread/mutex_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Thread::MutexObject : public Object {
MutexObject(ClassObject *klass)
: Object { Object::Type::ThreadMutex, klass } { }

Value lock(Env *);
Value lock(Env *, bool interruptible = true);
Value sleep(Env *, Optional<Value> = {});
Value synchronize(Env *, Block *);
bool try_lock();
Expand All @@ -32,6 +32,8 @@ class Thread::MutexObject : public Object {
virtual void visit_children(Visitor &) const override;

private:
void record_owner();

std::mutex m_mutex;
ThreadObject *m_thread { nullptr };
FiberObject *m_fiber { nullptr };
Expand Down
127 changes: 127 additions & 0 deletions lib/monitor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# frozen_string_literal: false

# Mix in to add reentrant monitor behavior to a class. Implemented on top of
# Mutex and Thread::ConditionVariable.
module MonitorMixin
def self.extend_object(obj)
super(obj)
obj.__send__(:mon_initialize)
end

def try_enter
if @mon_owner == Thread.current
@mon_count += 1
true
elsif @mon_mutex.try_lock
@mon_owner = Thread.current
@mon_count = 1
true
else
false
end
end

def enter
if @mon_owner == Thread.current
@mon_count += 1
else
@mon_mutex.lock
@mon_owner = Thread.current
@mon_count = 1
end
nil
end

def exit
mon_check_owner
@mon_count -= 1
if @mon_count == 0
@mon_owner = nil
@mon_mutex.unlock
end
nil
end

def mon_locked?
@mon_mutex.locked?
end

def mon_owned?
@mon_owner == Thread.current
end

def mon_check_owner
raise ThreadError, 'current thread not owner' if @mon_owner != Thread.current
end

def synchronize
enter
begin
yield
ensure
exit
end
end

def new_cond
ConditionVariable.new(self)
end

def wait_for_cond(cond, timeout)
mon_check_owner
count = @mon_count
@mon_owner = nil
@mon_count = 0
begin
cond.wait(@mon_mutex, timeout)
ensure
@mon_owner = Thread.current
@mon_count = count
end
true
end

class ConditionVariable
def initialize(monitor)
@monitor = monitor
@cond = Thread::ConditionVariable.new
end

def wait(timeout = nil)
@monitor.wait_for_cond(@cond, timeout)
end

def wait_until
wait until yield
end

def wait_while
wait while yield
end

def signal
@cond.signal
end

def broadcast
@cond.broadcast
end
end

private

def initialize(...)
super
mon_initialize
end

def mon_initialize
@mon_mutex = Mutex.new
@mon_owner = nil
@mon_count = 0
end
end

class Monitor
include MonitorMixin
end
7 changes: 2 additions & 5 deletions spec/core/conditionvariable/wait_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@
th.kill
th.join

NATFIXME 'Probably an issue with Mutex, not with ConditionVariable', exception: SpecFailedException do
owned.should == true
end
owned.should == true
end

it "reacquires the lock even if the thread is killed after being signaled" do
Expand Down Expand Up @@ -139,8 +137,7 @@
}

th.join
# NATFIXME: Inconsistent behaviour seen in runs. Probably an issue with Mutex, not with ConditionVariable
# owned.should == true
owned.should == true
end

it "supports multiple Threads waiting on the same ConditionVariable and Mutex" do
Expand Down
4 changes: 1 addition & 3 deletions spec/core/mutex/sleep_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@
Thread.pass until locked
Thread.pass until th.stop?
th.raise(Exception)
NATFIXME 'relocks the mutex when woken by an exception being raised', exception: SpecFailedException do
th.value.should be_true
end
th.value.should be_true
end

it "returns the rounded number of seconds asleep" do
Expand Down
28 changes: 28 additions & 0 deletions spec/library/monitor/enter_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require_relative '../../spec_helper'
require 'monitor'

describe "Monitor#enter" do
it "acquires the monitor" do
monitor = Monitor.new
10.times do
wait_q = Queue.new
continue_q = Queue.new

thread = Thread.new do
begin
monitor.enter
wait_q << true
continue_q.pop
ensure
monitor.exit
end
end

wait_q.pop
monitor.mon_locked?.should == true
continue_q << true
thread.join
monitor.mon_locked?.should == false
end
end
end
10 changes: 10 additions & 0 deletions spec/library/monitor/exit_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
require_relative '../../spec_helper'
require 'monitor'

describe "Monitor#exit" do
it "raises ThreadError when monitor is not entered" do
m = Monitor.new

-> { m.exit }.should raise_error(ThreadError)
end
end
31 changes: 31 additions & 0 deletions spec/library/monitor/mon_initialize_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
require_relative '../../spec_helper'
require 'monitor'

describe "MonitorMixin#mon_initialize" do
it "can be called in initialize_copy to get a new Mutex and used with synchronize" do
cls = Class.new do
include MonitorMixin

def initialize(*array)
mon_initialize
@array = array
end

def to_a
synchronize { @array.dup }
end

def initialize_copy(other)
mon_initialize

synchronize do
@array = other.to_a
end
end
end

instance = cls.new(1, 2, 3)
copy = instance.dup
copy.should_not equal(instance)
end
end
88 changes: 88 additions & 0 deletions spec/library/monitor/new_cond_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
require_relative '../../spec_helper'
require 'monitor'

describe "Monitor#new_cond" do
it "creates a MonitorMixin::ConditionVariable" do
m = Monitor.new
c = m.new_cond
c.class.should == MonitorMixin::ConditionVariable
end

it 'returns a condition variable which can be waited on by a thread holding the monitor' do
m = Monitor.new
c = m.new_cond

10.times do

wait_q = Queue.new
thread = Thread.new do
m.synchronize do
wait_q << true
c.wait
end
:done
end

wait_q.pop

# Synchronize can't happen until the other thread is waiting.
m.synchronize { c.signal }

thread.join
thread.value.should == :done
end
end

it 'returns a condition variable which can be waited on by a thread holding the monitor inside multiple synchronize blocks' do
m = Monitor.new
c = m.new_cond

10.times do

wait_q = Queue.new
thread = Thread.new do
m.synchronize do
m.synchronize do
wait_q << true
c.wait
end
end
:done
end

wait_q.pop

#No need to wait here as we cannot synchronize until the other thread is waiting.
m.synchronize { c.signal }

thread.join
thread.value.should == :done
end
end

it 'returns a condition variable which can be signalled by a thread holding the monitor inside multiple synchronize blocks' do
m = Monitor.new
c = m.new_cond

10.times do

wait_q = Queue.new
thread = Thread.new do
m.synchronize do
wait_q << true
c.wait
end
:done
end

wait_q.pop

# Synchronize can't happen until the other thread is waiting.
m.synchronize { m.synchronize { c.signal } }

thread.join
thread.value.should == :done
end
end

end
41 changes: 41 additions & 0 deletions spec/library/monitor/synchronize_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
require_relative '../../spec_helper'
require 'monitor'

describe "Monitor#synchronize" do
it "unlocks after return, even if it was interrupted by Thread#raise" do
exc_class = Class.new(RuntimeError)

monitor = Monitor.new
10.times do
wait_q = Queue.new
continue_q = Queue.new

thread = Thread.new do
begin
monitor.synchronize do
wait_q << true
# Do not wait here, we are trying to interrupt the ensure part of #synchronize
end
continue_q.pop
rescue exc_class
monitor.should_not.mon_locked?
:ok
end
end

wait_q.pop
thread.raise exc_class, "interrupt"
continue_q << true
thread.value.should == :ok
end
end

it "raises a LocalJumpError if not passed a block" do
-> { Monitor.new.synchronize }.should raise_error(LocalJumpError)
end

it "raises a thread error if the monitor is not owned on exiting the block" do
monitor = Monitor.new
-> { monitor.synchronize { monitor.exit } }.should raise_error(ThreadError)
end
end
Loading
Loading