Discussion:
A lock that prioritizes acquire()s?
David M Chess
12 years ago
Permalink
Okay, next silly question. :)

We have a very simple multi-threaded system where a request comes in,
starts running in a thread, and then (zero, one, or two times per request)
gets to a serialization point, where the code does:

with lock:
do_critical_section_stuff_that_might_take_awhile()

and then continues.

Which is almost the same as:

lock.acquire()
try:
do_critical_section_stuff_that_might_take_awhile()
finally:
lock.release()

Now we discover that It Would Be Nice if some requests got priority over
others, as in something like:

lock.acquire(importance=request.importance)
try:
do_critical_section_stuff_that_might_take_awhile()
finally:
lock.release()

and when lock.release() occurs, the next thread that gets to run is one of
the most important ones currently waiting in acquire() (that's the
exciting new thing).

Other requirements are that the code to do this be as simple as possible,
and that it not mess anything else up. :)

My first thought was something like a new lock-ish class that would do
roughly:

class PriorityLock(object):

def __init__(self):
self._lock = threading.Lock()
self._waiter_map = {} # maps TIDs to importance

def acquire(self,importance=0):
this_thread = threading.currentThread()
self._waiter_map[this_thread] = importance # I want in
while True:
self._lock.acquire()
if ( max( self._waiter_map.values())<=importance ): # we win
del self._waiter_map[this_thread] # not waiting anymore
return # return with lock acquired
self._lock.release() # We are not most impt: release/retry

def release(self):
self._lock.release()

(Hope the mail doesn't garble that too badly.)

Basically the acquire() method just immediately releases and tries again
if it finds that someone more important is waiting.

I think this is semantically correct, as long as the underlying lock
implementation doesn't have starvation issues, and it's nice and simple,
but on the other hand it looks eyerollingly inefficient.

Seeking any thoughts on other/better ways to do this, or whether the
inefficiency will be too eyerolling if we get say one request per second
with an average service time a bit under a second but maximum service time
well over a second, and most of them are importance zero, but every (many)
seconds there will be one or two with higher importance.

Tx,
DC

---
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20121024/cb8e59ec/attachment.html>
MRAB
12 years ago
Permalink
...
Here's my take on it:

class PriorityLock(object):

def __init__(self):
self._lock = threading.Lock()
self._waiter_queue = []
self._queue_lock = threading.Lock()

def acquire(self, importance=0):
this_thread = threading.currentThread()

# Add this thread to the queue
with self._queue_lock:
self._waiter_queue.append((importance, this_thread))
self._waiter_queue.sort(reverse=True, key=lambda pair:
pair[0]) # Move the most important to the start.

# Acquire and retain the lock when this thread is at the start
of the queue.
while True:
self._lock.acquire()

with self._queue_lock:
if self._waiter_queue[0][1] == this_thread: # We win.
del self._waiter_queue[0] # Not waiting anymore.
return # Return with lock acquired.

self._lock.release() # We are not most important: release
and retry.
time.sleep(0.01) # Give the other threads a chance.

def release(self):
self._lock.release()
Ian Kelly
12 years ago
Permalink
Post by David M Chess
Seeking any thoughts on other/better ways to do this, or whether the
inefficiency will be too eyerolling if we get say one request per second
with an average service time a bit under a second but maximum service time
well over a second, and most of them are importance zero, but every (many)
seconds there will be one or two with higher importance.
I used a PriorityQueue and Conditions to get rid of the ugly while True loop.


import threading
from Queue import PriorityQueue, Empty

class PriorityLock(object):

def __init__(self):
self._is_available = True
self._mutex = threading.Lock()
self._waiter_queue = PriorityQueue()

def acquire(self, priority=0):
self._mutex.acquire()
# First, just check the lock.
if self._is_available:
self._is_available = False
self._mutex.release()
return True
condition = threading.Condition()
condition.acquire()
self._waiter_queue.put((priority, condition))
self._mutex.release()
condition.wait()
condition.release()
return True

def release(self):
self._mutex.acquire()
# Notify the next thread in line, if any.
try:
_, condition = self._waiter_queue.get_nowait()
except Empty:
self._is_available = True
else:
condition.acquire()
condition.notify()
condition.release()
self._mutex.release()

def test():
import random, time

def thread(lock, priority):
lock.acquire(priority)
print("Thread %d running" % priority)
time.sleep(1)
lock.release()
lock = PriorityLock()
threads = [threading.Thread(target=thread, args=(lock, x)) for x
in range(10)]
random.shuffle(threads)
for thread in threads:
thread.start()
for thread in threads:
thread.join()

if __name__ == "__main__":
test()


Output:

Thread 9 running
Thread 0 running
Thread 1 running
Thread 2 running
Thread 3 running
Thread 4 running
Thread 5 running
Thread 6 running
Thread 7 running
Thread 8 running

Note that with the PriorityQueue, lower priority values are retrieved
first. Thread 9 ran first just by virtue of being first to the gate,
and after that you can see that everything went in order.

Cheers,
Ian
Ian Kelly
12 years ago
Permalink
Post by Ian Kelly
I used a PriorityQueue and Conditions to get rid of the ugly while True loop.
Same things, but with Events instead of Conditions. This is just a
bit more readable.

The PriorityQueue is also probably unnecessary, since it's always
accessed with the mutex held. A heapq would be fine.


import threading
import Queue

class PriorityLock(object):

def __init__(self):
self._is_available = True
self._mutex = threading.Lock()
self._waiter_queue = Queue.PriorityQueue()

def acquire(self, priority=0):
self._mutex.acquire()
# First, just check the lock.
if self._is_available:
self._is_available = False
self._mutex.release()
return True
event = threading.Event()
self._waiter_queue.put((priority, event))
self._mutex.release()
event.wait()
# When the event is triggered, we have the lock.
return True

def release(self):
self._mutex.acquire()
# Notify the next thread in line, if any.
try:
_, event = self._waiter_queue.get_nowait()
except Queue.Empty:
self._is_available = True
else:
event.set()
self._mutex.release()
David M Chess
12 years ago
Permalink
Lovely, thanks for the ideas! I remember considering having release()
pick the next thread to notify, where all the waiters were sitting on
separate Conditions or whatever; not sure why I didn't pursue it to the
end. Probably distracted by something shiny; or insufficient brainpower.
:) DC
--

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-list/attachments/20121024/9b4fa1a8/attachment-0001.html>
Continue reading on narkive:
Loading...