Coverage for interlocks.py: 0%
126 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-18 00:10 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-18 00:10 +0000
1# -*- coding: utf-8 -*-
3# Code from http://ender.snowburst.org:4747/~jjohns/interlocks.py
4# Thanks JJ!
6from __future__ import print_function
8# import L10n
9# _ = L10n.get_translation()
11import sys
12import os, os.path
13import time
14import base64
16InterProcessLock = None
18"""
19Just use me like a thread lock. acquire() / release() / locked()
21Differences compared to thread locks:
221. By default, acquire()'s wait parameter is false.
232. When acquire fails, SingleInstanceError is thrown instead of simply returning false.
243. acquire() can take a 3rd parameter retry_time, which, if wait is True, tells the locking
25 mechanism how long to sleep between retrying the lock. Has no effect for unix/InterProcessLockFcntl.
27Differences in fpdb version to JJ's original:
281. Changed acquire() to return false like other locks
292. Made acquire fail if same process already has the lock
30"""
33class SingleInstanceError(RuntimeError):
34 "Thrown when you try to acquire an InterProcessLock and another version of the process is already running."
37class InterProcessLockBase(object):
38 def __init__(self, name=None):
39 self._has_lock = False
40 if not name:
41 name = sys.argv[0]
42 self.name = name
43 self.heldBy = None
45 def getHashedName(self):
46 print(self.name) # debug
47 test = base64.b64encode(self.name.encode())
48 print(test)
49 test = test.replace(b"=", b"")
50 print(test)
51 test = test.decode()
52 print(test)
53 return test
55 def acquire_impl(self, wait):
56 pass
58 def acquire(self, source, wait=False, retry_time=1):
59 if source is None:
60 source = "Unknown"
61 if self._has_lock: # make sure 2nd acquire in same process fails
62 print(("lock already held by:"), self.heldBy)
63 return False
64 while not self._has_lock:
65 try:
66 self.acquire_impl(wait)
67 self._has_lock = True
68 self.heldBy = source
69 # print 'i have the lock'
70 except SingleInstanceError:
71 if not wait:
72 # raise # change back to normal acquire functionality, sorry JJ!
73 return False
74 time.sleep(retry_time)
75 return True
77 def release(self):
78 self.release_impl()
79 self._has_lock = False
80 self.heldBy = None
82 def locked(self):
83 if self.acquire():
84 self.release()
85 return False
86 return True
89LOCK_FILE_DIRECTORY = "/tmp"
92class InterProcessLockFcntl(InterProcessLockBase):
93 def __init__(self, name=None):
94 InterProcessLockBase.__init__(self, name)
95 self.lockfd = 0
96 self.lock_file_name = os.path.join(LOCK_FILE_DIRECTORY, self.getHashedName() + ".lck")
97 assert os.path.isdir(LOCK_FILE_DIRECTORY)
99 # This is the suggested way to get a safe file name, but I like having a descriptively named lock file.
100 def getHashedName(self):
101 import re
103 bad_filename_character_re = re.compile(r"/\?<>\\\:;\*\|\'\"\^=\.\[\]")
104 return bad_filename_character_re.sub("_", self.name)
106 def acquire_impl(self, wait):
107 self.lockfd = open(self.lock_file_name, "w")
108 fcntrl_options = fcntl.LOCK_EX
109 if not wait:
110 fcntrl_options |= fcntl.LOCK_NB
111 try:
112 fcntl.flock(self.lockfd, fcntrl_options)
113 except IOError:
114 self.lockfd.close()
115 self.lockfd = 0
116 raise SingleInstanceError("Could not acquire exclusive lock on " + self.lock_file_name)
118 def release_impl(self):
119 fcntl.lockf(self.lockfd, fcntl.LOCK_UN)
120 self.lockfd.close()
121 self.lockfd = 0
122 try:
123 os.unlink(self.lock_file_name)
124 except IOError:
125 # We don't care about the existence of the file too much here. It's the flock() we care about,
126 # And that should just go away magically.
127 pass
130class InterProcessLockWin32(InterProcessLockBase):
131 def __init__(self, name=None):
132 InterProcessLockBase.__init__(self, name)
133 self.mutex = None
135 def acquire_impl(self, wait):
136 self.mutex = win32event.CreateMutex(None, 0, self.getHashedName())
137 if win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS:
138 self.mutex.Close()
139 self.mutex = None
140 raise SingleInstanceError("Could not acquire exclusive lock on " + self.name)
142 def release_impl(self):
143 self.mutex.Close()
146class InterProcessLockSocket(InterProcessLockBase):
147 def __init__(self, name=None):
148 InterProcessLockBase.__init__(self, name)
149 self.socket = None
150 self.portno = 65530 - abs(self.getHashedName().__hash__()) % 32749
152 def acquire_impl(self, wait):
153 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
154 try:
155 self.socket.bind(("127.0.0.1", self.portno))
156 except socket.error:
157 self.socket.close()
158 self.socket = None
159 raise SingleInstanceError("Could not acquire exclusive lock on " + self.name)
161 def release_impl(self):
162 self.socket.close()
163 self.socket = None
166# Set InterProcessLock to the correct type given the sysem parameters available
167try:
168 import fcntl
170 InterProcessLock = InterProcessLockFcntl
171except ImportError:
172 try:
173 import win32event
174 import win32api
175 import winerror
177 InterProcessLock = InterProcessLockWin32
178 except ImportError:
179 import socket
181 InterProcessLock = InterProcessLockSocket
184def test_construct():
185 """
186 # Making the name of the test unique so it can be executed my multiple users on the same machine.
187 >>> test_name = 'InterProcessLockTest' +str(os.getpid()) + str(time.time())
189 >>> lock1 = InterProcessLock(name=test_name)
190 >>> lock1.acquire()
191 True
193 >>> lock2 = InterProcessLock(name=test_name)
194 >>> lock3 = InterProcessLock(name=test_name)
196 # Since lock1 is locked, other attempts to acquire it fail.
197 >>> lock2.acquire()
198 False
200 >>> lock3.acquire()
201 False
203 # Release the lock and let lock2 have it.
204 >>> lock1.release()
205 >>> lock2.acquire()
206 True
208 >>> lock3.acquire()
209 False
211 # Release it and give it back to lock1
212 >>> lock2.release()
213 >>> lock1.acquire()
214 True
216 >>> lock2.acquire()
217 False
219 # Test lock status
220 >>> lock2.locked()
221 True
222 >>> lock3.locked()
223 True
224 >>> lock1.locked()
225 True
227 >>> lock1.release()
229 >>> lock2.locked()
230 False
231 >>> lock3.locked()
232 False
233 >>> lock1.locked()
234 False
236 >>> if os.name == 'posix':
237 ... def os_independent_kill(pid):
238 ... import signal
239 ... os.kill(pid, signal.SIGKILL)
240 ... else:
241 ... assert(os.name == 'nt')
242 ... def os_independent_kill(pid):
243 ... ''' http://www.python.org/doc/faq/windows/#how-do-i-emulate-os-kill-in-windows '''
244 ... import win32api
245 ... import win32con
246 ... import pywintypes
247 ... handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE , pywintypes.FALSE, pid)
248 ... #return (0 != win32api.TerminateProcess(handle, 0))
250 # Test to acquire the lock in another process.
251 >>> def execute(cmd):
252 ... cmd = 'import time;' + cmd + 'time.sleep(10);'
253 ... process = subprocess.Popen([sys.executable, '-c', cmd])
254 ... pid = process.pid
255 ... time.sleep(2) # quick hack, but we test synchronization in the end
256 ... return pid
258 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();')
260 >>> lock1.acquire()
261 False
263 >>> os_independent_kill(pid)
265 >>> time.sleep(1)
267 >>> lock1.acquire()
268 True
269 >>> lock1.release()
271 # Testing wait
273 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();')
275 >>> lock1.acquire()
276 False
278 >>> os_independent_kill(pid)
280 >>> lock1.acquire(True)
281 True
282 >>> lock1.release()
284 """
286 pass
289if __name__ == "__main__":
290 import doctest
292 doctest.testmod(optionflags=doctest.IGNORE_EXCEPTION_DETAIL)