Coverage for interlocks.py: 0%

126 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-15 19:33 +0000

1# -*- coding: utf-8 -*- 

2 

3# Code from http://ender.snowburst.org:4747/~jjohns/interlocks.py 

4# Thanks JJ! 

5 

6from __future__ import print_function 

7 

8# import L10n 

9# _ = L10n.get_translation() 

10 

11import sys 

12import os, os.path 

13import time 

14import base64 

15 

16InterProcessLock = None 

17 

18""" 

19Just use me like a thread lock. acquire() / release() / locked() 

20 

21Differences compared to thread locks: 

221. By default, acquire()'s wait parameter is false. 

232. When acquire fails, SingleInstanceError is thrown instead of simply returning false. 

243. acquire() can take a 3rd parameter retry_time, which, if wait is True, tells the locking  

25 mechanism how long to sleep between retrying the lock. Has no effect for unix/InterProcessLockFcntl. 

26 

27Differences in fpdb version to JJ's original: 

281. Changed acquire() to return false like other locks 

292. Made acquire fail if same process already has the lock 

30""" 

31 

32 

33class SingleInstanceError(RuntimeError): 

34 "Thrown when you try to acquire an InterProcessLock and another version of the process is already running." 

35 

36 

37class InterProcessLockBase(object): 

38 def __init__(self, name=None): 

39 self._has_lock = False 

40 if not name: 

41 name = sys.argv[0] 

42 self.name = name 

43 self.heldBy = None 

44 

45 def getHashedName(self): 

46 print(self.name) # debug 

47 test = base64.b64encode(self.name.encode()) 

48 print(test) 

49 test = test.replace(b"=", b"") 

50 print(test) 

51 test = test.decode() 

52 print(test) 

53 return test 

54 

55 def acquire_impl(self, wait): 

56 pass 

57 

58 def acquire(self, source, wait=False, retry_time=1): 

59 if source is None: 

60 source = "Unknown" 

61 if self._has_lock: # make sure 2nd acquire in same process fails 

62 print(("lock already held by:"), self.heldBy) 

63 return False 

64 while not self._has_lock: 

65 try: 

66 self.acquire_impl(wait) 

67 self._has_lock = True 

68 self.heldBy = source 

69 # print 'i have the lock' 

70 except SingleInstanceError: 

71 if not wait: 

72 # raise # change back to normal acquire functionality, sorry JJ! 

73 return False 

74 time.sleep(retry_time) 

75 return True 

76 

77 def release(self): 

78 self.release_impl() 

79 self._has_lock = False 

80 self.heldBy = None 

81 

82 def locked(self): 

83 if self.acquire(): 

84 self.release() 

85 return False 

86 return True 

87 

88 

89LOCK_FILE_DIRECTORY = "/tmp" 

90 

91 

92class InterProcessLockFcntl(InterProcessLockBase): 

93 def __init__(self, name=None): 

94 InterProcessLockBase.__init__(self, name) 

95 self.lockfd = 0 

96 self.lock_file_name = os.path.join(LOCK_FILE_DIRECTORY, self.getHashedName() + ".lck") 

97 assert os.path.isdir(LOCK_FILE_DIRECTORY) 

98 

99 # This is the suggested way to get a safe file name, but I like having a descriptively named lock file. 

100 def getHashedName(self): 

101 import re 

102 

103 bad_filename_character_re = re.compile(r"/\?<>\\\:;\*\|\'\"\^=\.\[\]") 

104 return bad_filename_character_re.sub("_", self.name) 

105 

106 def acquire_impl(self, wait): 

107 self.lockfd = open(self.lock_file_name, "w") 

108 fcntrl_options = fcntl.LOCK_EX 

109 if not wait: 

110 fcntrl_options |= fcntl.LOCK_NB 

111 try: 

112 fcntl.flock(self.lockfd, fcntrl_options) 

113 except IOError: 

114 self.lockfd.close() 

115 self.lockfd = 0 

116 raise SingleInstanceError("Could not acquire exclusive lock on " + self.lock_file_name) 

117 

118 def release_impl(self): 

119 fcntl.lockf(self.lockfd, fcntl.LOCK_UN) 

120 self.lockfd.close() 

121 self.lockfd = 0 

122 try: 

123 os.unlink(self.lock_file_name) 

124 except IOError: 

125 # We don't care about the existence of the file too much here. It's the flock() we care about, 

126 # And that should just go away magically. 

127 pass 

128 

129 

130class InterProcessLockWin32(InterProcessLockBase): 

131 def __init__(self, name=None): 

132 InterProcessLockBase.__init__(self, name) 

133 self.mutex = None 

134 

135 def acquire_impl(self, wait): 

136 self.mutex = win32event.CreateMutex(None, 0, self.getHashedName()) 

137 if win32api.GetLastError() == winerror.ERROR_ALREADY_EXISTS: 

138 self.mutex.Close() 

139 self.mutex = None 

140 raise SingleInstanceError("Could not acquire exclusive lock on " + self.name) 

141 

142 def release_impl(self): 

143 self.mutex.Close() 

144 

145 

146class InterProcessLockSocket(InterProcessLockBase): 

147 def __init__(self, name=None): 

148 InterProcessLockBase.__init__(self, name) 

149 self.socket = None 

150 self.portno = 65530 - abs(self.getHashedName().__hash__()) % 32749 

151 

152 def acquire_impl(self, wait): 

153 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

154 try: 

155 self.socket.bind(("127.0.0.1", self.portno)) 

156 except socket.error: 

157 self.socket.close() 

158 self.socket = None 

159 raise SingleInstanceError("Could not acquire exclusive lock on " + self.name) 

160 

161 def release_impl(self): 

162 self.socket.close() 

163 self.socket = None 

164 

165 

166# Set InterProcessLock to the correct type given the sysem parameters available 

167try: 

168 import fcntl 

169 

170 InterProcessLock = InterProcessLockFcntl 

171except ImportError: 

172 try: 

173 import win32event 

174 import win32api 

175 import winerror 

176 

177 InterProcessLock = InterProcessLockWin32 

178 except ImportError: 

179 import socket 

180 

181 InterProcessLock = InterProcessLockSocket 

182 

183 

184def test_construct(): 

185 """ 

186 # Making the name of the test unique so it can be executed my multiple users on the same machine. 

187 >>> test_name = 'InterProcessLockTest' +str(os.getpid()) + str(time.time()) 

188 

189 >>> lock1 = InterProcessLock(name=test_name) 

190 >>> lock1.acquire() 

191 True 

192 

193 >>> lock2 = InterProcessLock(name=test_name) 

194 >>> lock3 = InterProcessLock(name=test_name) 

195 

196 # Since lock1 is locked, other attempts to acquire it fail. 

197 >>> lock2.acquire() 

198 False 

199 

200 >>> lock3.acquire() 

201 False 

202 

203 # Release the lock and let lock2 have it. 

204 >>> lock1.release() 

205 >>> lock2.acquire() 

206 True 

207 

208 >>> lock3.acquire() 

209 False 

210 

211 # Release it and give it back to lock1 

212 >>> lock2.release() 

213 >>> lock1.acquire() 

214 True 

215 

216 >>> lock2.acquire() 

217 False 

218 

219 # Test lock status 

220 >>> lock2.locked() 

221 True 

222 >>> lock3.locked() 

223 True 

224 >>> lock1.locked() 

225 True 

226 

227 >>> lock1.release() 

228 

229 >>> lock2.locked() 

230 False 

231 >>> lock3.locked() 

232 False 

233 >>> lock1.locked() 

234 False 

235 

236 >>> if os.name == 'posix': 

237 ... def os_independent_kill(pid): 

238 ... import signal 

239 ... os.kill(pid, signal.SIGKILL) 

240 ... else: 

241 ... assert(os.name == 'nt') 

242 ... def os_independent_kill(pid): 

243 ... ''' http://www.python.org/doc/faq/windows/#how-do-i-emulate-os-kill-in-windows ''' 

244 ... import win32api 

245 ... import win32con 

246 ... import pywintypes 

247 ... handle = win32api.OpenProcess(win32con.PROCESS_TERMINATE , pywintypes.FALSE, pid) 

248 ... #return (0 != win32api.TerminateProcess(handle, 0)) 

249 

250 # Test to acquire the lock in another process. 

251 >>> def execute(cmd): 

252 ... cmd = 'import time;' + cmd + 'time.sleep(10);' 

253 ... process = subprocess.Popen([sys.executable, '-c', cmd]) 

254 ... pid = process.pid 

255 ... time.sleep(2) # quick hack, but we test synchronization in the end 

256 ... return pid 

257 

258 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();') 

259 

260 >>> lock1.acquire() 

261 False 

262 

263 >>> os_independent_kill(pid) 

264 

265 >>> time.sleep(1) 

266 

267 >>> lock1.acquire() 

268 True 

269 >>> lock1.release() 

270 

271 # Testing wait 

272 

273 >>> pid = execute('import interlocks;a=interlocks.InterProcessLock(name=\\''+test_name+ '\\');a.acquire();') 

274 

275 >>> lock1.acquire() 

276 False 

277 

278 >>> os_independent_kill(pid) 

279 

280 >>> lock1.acquire(True) 

281 True 

282 >>> lock1.release() 

283 

284 """ 

285 

286 pass 

287 

288 

289if __name__ == "__main__": 

290 import doctest 

291 

292 doctest.testmod(optionflags=doctest.IGNORE_EXCEPTION_DETAIL)