Coverage for interlocks.py: 0%

127 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-28 16:41 +0000

1# -*- coding: utf-8 -*- 

2 

3# Code from http://ender.snowburst.org:4747/~jjohns/interlocks.py 

4# Thanks JJ! 

5 

6from __future__ import print_function 

7 

8#import L10n 

9#_ = L10n.get_translation() 

10 

11import sys 

12import os, os.path 

13import subprocess 

14import time 

15import signal 

16import base64 

17 

18InterProcessLock = None 

19 

20""" 

21Just use me like a thread lock. acquire() / release() / locked() 

22 

23Differences compared to thread locks: 

241. By default, acquire()'s wait parameter is false. 

252. When acquire fails, SingleInstanceError is thrown instead of simply returning false. 

263. acquire() can take a 3rd parameter retry_time, which, if wait is True, tells the locking  

27 mechanism how long to sleep between retrying the lock. Has no effect for unix/InterProcessLockFcntl. 

28 

29Differences in fpdb version to JJ's original: 

301. Changed acquire() to return false like other locks 

312. Made acquire fail if same process already has the lock 

32""" 

33 

34class SingleInstanceError(RuntimeError): 

35 "Thrown when you try to acquire an InterProcessLock and another version of the process is already running." 

36 

37class InterProcessLockBase(object): 

38 def __init__(self, name=None ): 

39 self._has_lock = False 

40 if not name: 

41 name = sys.argv[0] 

42 self.name = name 

43 self.heldBy = None 

44 

45 def getHashedName(self): 

46 print (self.name) #debug 

47 test = base64.b64encode(self.name.encode()) 

48 print (test) 

49 test = test.replace(b'=',b'') 

50 print(test) 

51 test = test.decode() 

52 print (test) 

53 return test 

54 

55 def acquire_impl(self, wait): abstract 

56 

57 def acquire(self, source, wait=False, retry_time=1): 

58 if source == None: 

59 source="Unknown" 

60 if self._has_lock: # make sure 2nd acquire in same process fails 

61 print(("lock already held by:"),self.heldBy) 

62 return False 

63 while not self._has_lock: 

64 try: 

65 self.acquire_impl(wait) 

66 self._has_lock = True 

67 self.heldBy=source 

68 #print 'i have the lock' 

69 except SingleInstanceError: 

70 if not wait: 

71 # raise # change back to normal acquire functionality, sorry JJ! 

72 return False 

73 time.sleep(retry_time) 

74 return True 

75 

76 def release(self): 

77 self.release_impl() 

78 self._has_lock = False 

79 self.heldBy=None 

80 

81 def locked(self): 

82 if self.acquire(): 

83 self.release() 

84 return False 

85 return True 

86 

87LOCK_FILE_DIRECTORY = '/tmp' 

88 

89class InterProcessLockFcntl(InterProcessLockBase): 

90 def __init__(self, name=None): 

91 InterProcessLockBase.__init__(self, name) 

92 self.lockfd = 0 

93 self.lock_file_name = os.path.join(LOCK_FILE_DIRECTORY, self.getHashedName() + '.lck') 

94 assert(os.path.isdir(LOCK_FILE_DIRECTORY)) 

95 

96 # This is the suggested way to get a safe file name, but I like having a descriptively named lock file. 

97 def getHashedName(self): 

98 import re 

99 bad_filename_character_re = re.compile(r'/\?<>\\\:;\*\|\'\"\^=\.\[\]') 

100 return bad_filename_character_re.sub('_',self.name) 

101 

102 def acquire_impl(self, wait): 

103 self.lockfd = open(self.lock_file_name, 'w') 

104 fcntrl_options = fcntl.LOCK_EX 

105 if not wait: 

106 fcntrl_options |= fcntl.LOCK_NB 

107 try: 

108 fcntl.flock(self.lockfd, fcntrl_options) 

109 except IOError: 

110 self.lockfd.close() 

111 self.lockfd = 0 

112 raise SingleInstanceError('Could not acquire exclusive lock on '+self.lock_file_name) 

113 

114 def release_impl(self): 

115 fcntl.lockf(self.lockfd, fcntl.LOCK_UN) 

116 self.lockfd.close() 

117 self.lockfd = 0 

118 try: 

119 os.unlink(self.lock_file_name) 

120 except IOError: 

121 # We don't care about the existence of the file too much here. It's the flock() we care about, 

122 # And that should just go away magically. 

123 pass 

124 

125class InterProcessLockWin32(InterProcessLockBase): 

126 def __init__(self, name=None): 

127 InterProcessLockBase.__init__(self, name) 

128 self.mutex = None 

129 

130 def acquire_impl(self,wait): 

131 self.mutex = win32event.CreateMutex(None, 0, self.getHashedName()) 

132 if win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS: 

133 self.mutex.Close() 

134 self.mutex = None 

135 raise SingleInstanceError('Could not acquire exclusive lock on ' + self.name) 

136 

137 def release_impl(self): 

138 self.mutex.Close() 

139 

140class InterProcessLockSocket(InterProcessLockBase): 

141 def __init__(self, name=None): 

142 InterProcessLockBase.__init__(self, name) 

143 self.socket = None 

144 self.portno = 65530 - abs(self.getHashedName().__hash__()) % 32749 

145 

146 def acquire_impl(self, wait): 

147 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

148 try: 

149 self.socket.bind(('127.0.0.1', self.portno)) 

150 except socket.error: 

151 self.socket.close() 

152 self.socket = None 

153 raise SingleInstanceError('Could not acquire exclusive lock on ' + self.name) 

154 

155 def release_impl(self): 

156 self.socket.close() 

157 self.socket = None 

158 

159# Set InterProcessLock to the correct type given the sysem parameters available 

160try: 

161 import fcntl 

162 InterProcessLock = InterProcessLockFcntl 

163except ImportError: 

164 try: 

165 import win32event 

166 import win32api 

167 import winerror 

168 InterProcessLock = InterProcessLockWin32 

169 except ImportError: 

170 import socket 

171 InterProcessLock = InterProcessLockSocket 

172 

173def test_construct(): 

174 """ 

175 # Making the name of the test unique so it can be executed my multiple users on the same machine. 

176 >>> test_name = 'InterProcessLockTest' +str(os.getpid()) + str(time.time()) 

177 

178 >>> lock1 = InterProcessLock(name=test_name) 

179 >>> lock1.acquire() 

180 True 

181 

182 >>> lock2 = InterProcessLock(name=test_name) 

183 >>> lock3 = InterProcessLock(name=test_name) 

184 

185 # Since lock1 is locked, other attempts to acquire it fail. 

186 >>> lock2.acquire() 

187 False 

188 

189 >>> lock3.acquire() 

190 False 

191 

192 # Release the lock and let lock2 have it. 

193 >>> lock1.release() 

194 >>> lock2.acquire() 

195 True 

196 

197 >>> lock3.acquire() 

198 False 

199 

200 # Release it and give it back to lock1 

201 >>> lock2.release() 

202 >>> lock1.acquire() 

203 True 

204 

205 >>> lock2.acquire() 

206 False 

207 

208 # Test lock status 

209 >>> lock2.locked() 

210 True 

211 >>> lock3.locked() 

212 True 

213 >>> lock1.locked() 

214 True 

215 

216 >>> lock1.release() 

217 

218 >>> lock2.locked() 

219 False 

220 >>> lock3.locked() 

221 False 

222 >>> lock1.locked() 

223 False 

224 

225 >>> if os.name == 'posix': 

226 ... def os_independent_kill(pid): 

227 ... import signal 

228 ... os.kill(pid, signal.SIGKILL) 

229 ... else: 

230 ... assert(os.name == 'nt') 

231 ... def os_independent_kill(pid): 

232 ... ''' http://www.python.org/doc/faq/windows/#how-do-i-emulate-os-kill-in-windows ''' 

233 ... import win32api 

234 ... import win32con 

235 ... import pywintypes 

236 ... handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE , pywintypes.FALSE, pid) 

237 ... #return (0 != win32api.TerminateProcess(handle, 0)) 

238 

239 # Test to acquire the lock in another process. 

240 >>> def execute(cmd): 

241 ... cmd = 'import time;' + cmd + 'time.sleep(10);' 

242 ... process = subprocess.Popen([sys.executable, '-c', cmd]) 

243 ... pid = process.pid 

244 ... time.sleep(2) # quick hack, but we test synchronization in the end 

245 ... return pid 

246 

247 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();') 

248 

249 >>> lock1.acquire() 

250 False 

251 

252 >>> os_independent_kill(pid) 

253 

254 >>> time.sleep(1) 

255 

256 >>> lock1.acquire() 

257 True 

258 >>> lock1.release() 

259 

260 # Testing wait 

261 

262 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();') 

263 

264 >>> lock1.acquire() 

265 False 

266 

267 >>> os_independent_kill(pid) 

268 

269 >>> lock1.acquire(True) 

270 True 

271 >>> lock1.release() 

272 

273 """ 

274 

275 pass 

276 

277if __name__=='__main__': 

278 import doctest 

279 doctest.testmod(optionflags=doctest.IGNORE_EXCEPTION_DETAIL)