Coverage for interlocks.py: 0%
127 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 18:50 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-27 18:50 +0000
1# -*- coding: utf-8 -*-
3# Code from http://ender.snowburst.org:4747/~jjohns/interlocks.py
4# Thanks JJ!
6from __future__ import print_function
8#import L10n
9#_ = L10n.get_translation()
11import sys
12import os, os.path
13import subprocess
14import time
15import signal
16import base64
18InterProcessLock = None
20"""
21Just use me like a thread lock. acquire() / release() / locked()
23Differences compared to thread locks:
241. By default, acquire()'s wait parameter is false.
252. When acquire fails, SingleInstanceError is thrown instead of simply returning false.
263. acquire() can take a 3rd parameter retry_time, which, if wait is True, tells the locking
27 mechanism how long to sleep between retrying the lock. Has no effect for unix/InterProcessLockFcntl.
29Differences in fpdb version to JJ's original:
301. Changed acquire() to return false like other locks
312. Made acquire fail if same process already has the lock
32"""
34class SingleInstanceError(RuntimeError):
35 "Thrown when you try to acquire an InterProcessLock and another version of the process is already running."
37class InterProcessLockBase(object):
38 def __init__(self, name=None ):
39 self._has_lock = False
40 if not name:
41 name = sys.argv[0]
42 self.name = name
43 self.heldBy = None
45 def getHashedName(self):
46 print (self.name) #debug
47 test = base64.b64encode(self.name.encode())
48 print (test)
49 test = test.replace(b'=',b'')
50 print(test)
51 test = test.decode()
52 print (test)
53 return test
55 def acquire_impl(self, wait): abstract
57 def acquire(self, source, wait=False, retry_time=1):
58 if source == None:
59 source="Unknown"
60 if self._has_lock: # make sure 2nd acquire in same process fails
61 print(("lock already held by:"),self.heldBy)
62 return False
63 while not self._has_lock:
64 try:
65 self.acquire_impl(wait)
66 self._has_lock = True
67 self.heldBy=source
68 #print 'i have the lock'
69 except SingleInstanceError:
70 if not wait:
71 # raise # change back to normal acquire functionality, sorry JJ!
72 return False
73 time.sleep(retry_time)
74 return True
76 def release(self):
77 self.release_impl()
78 self._has_lock = False
79 self.heldBy=None
81 def locked(self):
82 if self.acquire():
83 self.release()
84 return False
85 return True
87LOCK_FILE_DIRECTORY = '/tmp'
89class InterProcessLockFcntl(InterProcessLockBase):
90 def __init__(self, name=None):
91 InterProcessLockBase.__init__(self, name)
92 self.lockfd = 0
93 self.lock_file_name = os.path.join(LOCK_FILE_DIRECTORY, self.getHashedName() + '.lck')
94 assert(os.path.isdir(LOCK_FILE_DIRECTORY))
96 # This is the suggested way to get a safe file name, but I like having a descriptively named lock file.
97 def getHashedName(self):
98 import re
99 bad_filename_character_re = re.compile(r'/\?<>\\\:;\*\|\'\"\^=\.\[\]')
100 return bad_filename_character_re.sub('_',self.name)
102 def acquire_impl(self, wait):
103 self.lockfd = open(self.lock_file_name, 'w')
104 fcntrl_options = fcntl.LOCK_EX
105 if not wait:
106 fcntrl_options |= fcntl.LOCK_NB
107 try:
108 fcntl.flock(self.lockfd, fcntrl_options)
109 except IOError:
110 self.lockfd.close()
111 self.lockfd = 0
112 raise SingleInstanceError('Could not acquire exclusive lock on '+self.lock_file_name)
114 def release_impl(self):
115 fcntl.lockf(self.lockfd, fcntl.LOCK_UN)
116 self.lockfd.close()
117 self.lockfd = 0
118 try:
119 os.unlink(self.lock_file_name)
120 except IOError:
121 # We don't care about the existence of the file too much here. It's the flock() we care about,
122 # And that should just go away magically.
123 pass
125class InterProcessLockWin32(InterProcessLockBase):
126 def __init__(self, name=None):
127 InterProcessLockBase.__init__(self, name)
128 self.mutex = None
130 def acquire_impl(self,wait):
131 self.mutex = win32event.CreateMutex(None, 0, self.getHashedName())
132 if win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS:
133 self.mutex.Close()
134 self.mutex = None
135 raise SingleInstanceError('Could not acquire exclusive lock on ' + self.name)
137 def release_impl(self):
138 self.mutex.Close()
140class InterProcessLockSocket(InterProcessLockBase):
141 def __init__(self, name=None):
142 InterProcessLockBase.__init__(self, name)
143 self.socket = None
144 self.portno = 65530 - abs(self.getHashedName().__hash__()) % 32749
146 def acquire_impl(self, wait):
147 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
148 try:
149 self.socket.bind(('127.0.0.1', self.portno))
150 except socket.error:
151 self.socket.close()
152 self.socket = None
153 raise SingleInstanceError('Could not acquire exclusive lock on ' + self.name)
155 def release_impl(self):
156 self.socket.close()
157 self.socket = None
159# Set InterProcessLock to the correct type given the sysem parameters available
160try:
161 import fcntl
162 InterProcessLock = InterProcessLockFcntl
163except ImportError:
164 try:
165 import win32event
166 import win32api
167 import winerror
168 InterProcessLock = InterProcessLockWin32
169 except ImportError:
170 import socket
171 InterProcessLock = InterProcessLockSocket
173def test_construct():
174 """
175 # Making the name of the test unique so it can be executed my multiple users on the same machine.
176 >>> test_name = 'InterProcessLockTest' +str(os.getpid()) + str(time.time())
178 >>> lock1 = InterProcessLock(name=test_name)
179 >>> lock1.acquire()
180 True
182 >>> lock2 = InterProcessLock(name=test_name)
183 >>> lock3 = InterProcessLock(name=test_name)
185 # Since lock1 is locked, other attempts to acquire it fail.
186 >>> lock2.acquire()
187 False
189 >>> lock3.acquire()
190 False
192 # Release the lock and let lock2 have it.
193 >>> lock1.release()
194 >>> lock2.acquire()
195 True
197 >>> lock3.acquire()
198 False
200 # Release it and give it back to lock1
201 >>> lock2.release()
202 >>> lock1.acquire()
203 True
205 >>> lock2.acquire()
206 False
208 # Test lock status
209 >>> lock2.locked()
210 True
211 >>> lock3.locked()
212 True
213 >>> lock1.locked()
214 True
216 >>> lock1.release()
218 >>> lock2.locked()
219 False
220 >>> lock3.locked()
221 False
222 >>> lock1.locked()
223 False
225 >>> if os.name == 'posix':
226 ... def os_independent_kill(pid):
227 ... import signal
228 ... os.kill(pid, signal.SIGKILL)
229 ... else:
230 ... assert(os.name == 'nt')
231 ... def os_independent_kill(pid):
232 ... ''' http://www.python.org/doc/faq/windows/#how-do-i-emulate-os-kill-in-windows '''
233 ... import win32api
234 ... import win32con
235 ... import pywintypes
236 ... handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE , pywintypes.FALSE, pid)
237 ... #return (0 != win32api.TerminateProcess(handle, 0))
239 # Test to acquire the lock in another process.
240 >>> def execute(cmd):
241 ... cmd = 'import time;' + cmd + 'time.sleep(10);'
242 ... process = subprocess.Popen([sys.executable, '-c', cmd])
243 ... pid = process.pid
244 ... time.sleep(2) # quick hack, but we test synchronization in the end
245 ... return pid
247 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();')
249 >>> lock1.acquire()
250 False
252 >>> os_independent_kill(pid)
254 >>> time.sleep(1)
256 >>> lock1.acquire()
257 True
258 >>> lock1.release()
260 # Testing wait
262 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();')
264 >>> lock1.acquire()
265 False
267 >>> os_independent_kill(pid)
269 >>> lock1.acquire(True)
270 True
271 >>> lock1.release()
273 """
275 pass
277if __name__=='__main__':
278 import doctest
279 doctest.testmod(optionflags=doctest.IGNORE_EXCEPTION_DETAIL)