Coverage for IdentifySite.py: 0%

290 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2024-11-18 00:10 +0000

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3 

4# Copyright 2010-2011 Chaz Littlejohn 

5# This program is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU Affero General Public License as published by 

7# the Free Software Foundation, version 3 of the License. 

8# 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU Affero General Public License 

15# along with this program. If not, see <http://www.gnu.org/licenses/>. 

16# In the "official" distribution you can find the license in agpl-3.0.txt. 

17 

18from __future__ import print_function 

19 

20 

21# import L10n 

22# _ = L10n.get_translation() 

23 

24import re 

25import sys 

26import os 

27from time import time 

28import codecs 

29 

30 

31import Configuration 

32import logging 

33 

34try: 

35 import xlrd 

36except ImportError: 

37 xlrd = None 

38# logging has been set up in fpdb.py or HUD_main.py, use their settings: 

39log = logging.getLogger("parser") 

40 

41re_Divider, re_Head, re_XLS = {}, {}, {} 

42re_Divider["PokerStars"] = re.compile(r"^Hand #(\d+)\s*$", re.MULTILINE) 

43re_Divider["Fulltilt"] = re.compile(r"\*{20}\s#\s\d+\s\*{15,25}\s?", re.MULTILINE) 

44re_Head["Fulltilt"] = re.compile(r"^((BEGIN)?\n)?FullTiltPoker.+\n\nSeat", re.MULTILINE) 

45re_XLS["PokerStars"] = re.compile(r"Tournaments\splayed\sby\s\'.+?\'") 

46re_XLS["Fulltilt"] = re.compile(r"Player\sTournament\sReport\sfor\s.+?\s\(.*\)") 

47 

48 

49class FPDBFile(object): 

50 path = "" 

51 ftype = None # Valid: hh, summary, both 

52 site = None 

53 kodec = None 

54 archive = False 

55 archiveHead = False 

56 archiveDivider = False 

57 gametype = False 

58 hero = "-" 

59 

60 def __init__(self, path): 

61 self.path = path 

62 

63 

64class Site(object): 

65 def __init__(self, name, hhc_fname, filter_name, summary, obj): 

66 self.name = name 

67 # FIXME: rename filter to hhc_fname 

68 self.hhc_fname = hhc_fname 

69 # FIXME: rename filter_name to hhc_type 

70 self.filter_name = filter_name 

71 self.re_SplitHands = obj.re_SplitHands 

72 self.codepage = obj.codepage 

73 self.copyGameHeader = obj.copyGameHeader 

74 self.summaryInFile = obj.summaryInFile 

75 self.re_Identify = obj.re_Identify 

76 # self.obj = obj 

77 if summary: 

78 self.summary = summary 

79 self.re_SumIdentify = getattr(__import__(summary), summary, None).re_Identify 

80 else: 

81 self.summary = None 

82 self.line_delimiter = self.getDelimiter(filter_name) 

83 self.line_addendum = self.getAddendum(filter_name) 

84 self.spaces = filter_name == "Entraction" 

85 self.getHeroRegex(obj, filter_name) 

86 

87 def getDelimiter(self, filter_name): 

88 line_delimiter = None 

89 if filter_name == "PokerStars": 

90 line_delimiter = "\n\n" 

91 elif filter_name == "Fulltilt" or filter_name == "PokerTracker": 

92 line_delimiter = "\n\n\n" 

93 elif self.re_SplitHands.match("\n\n") and filter_name != "Entraction": 

94 line_delimiter = "\n\n" 

95 elif self.re_SplitHands.match("\n\n\n"): 

96 line_delimiter = "\n\n\n" 

97 

98 return line_delimiter 

99 

100 def getAddendum(self, filter_name): 

101 line_addendum = "" 

102 if filter_name == "OnGame": 

103 line_addendum = "*" 

104 elif filter_name == "Merge": 

105 line_addendum = "<" 

106 elif filter_name == "Entraction": 

107 line_addendum = "\n\n" 

108 

109 return line_addendum 

110 

111 def getHeroRegex(self, obj, filter_name): 

112 self.re_HeroCards = None 

113 if hasattr(obj, "re_HeroCards"): 

114 if filter_name not in ("Bovada", "Enet"): 

115 self.re_HeroCards = obj.re_HeroCards 

116 if filter_name == "PokerTracker": 

117 self.re_HeroCards1 = obj.re_HeroCards1 

118 self.re_HeroCards2 = obj.re_HeroCards2 

119 

120 

121class IdentifySite(object): 

122 def __init__(self, config, hhcs=None): 

123 self.config = config 

124 self.codepage = ("utf8", "utf-16", "cp1252", "ISO-8859-1") 

125 self.sitelist = {} 

126 self.filelist = {} 

127 self.generateSiteList(hhcs) 

128 

129 def scan(self, path): 

130 if os.path.isdir(path): 

131 self.walkDirectory(path, self.sitelist) 

132 else: 

133 self.processFile(path) 

134 

135 def get_fobj(self, file): 

136 try: 

137 fobj = self.filelist[file] 

138 except KeyError: 

139 return False 

140 return fobj 

141 

142 def get_filelist(self): 

143 return self.filelist 

144 

145 def clear_filelist(self): 

146 self.filelist = {} 

147 

148 def generateSiteList(self, hhcs): 

149 """Generates a ordered dictionary of site, filter and filter name for each site in hhcs""" 

150 if not hhcs: 

151 hhcs = self.config.hhcs 

152 for site, hhc in list(hhcs.items()): 

153 filter = hhc.converter 

154 filter_name = filter.replace("ToFpdb", "") 

155 summary = hhc.summaryImporter 

156 mod = __import__(filter) 

157 obj = getattr(mod, filter_name, None) 

158 try: 

159 self.sitelist[obj.siteId] = Site(site, filter, filter_name, summary, obj) 

160 except Exception as e: 

161 log.error("Failed to load HH importer: %s. %s" % (filter_name, e)) 

162 self.re_Identify_PT = getattr(__import__("PokerTrackerToFpdb"), "PokerTracker", None).re_Identify 

163 self.re_SumIdentify_PT = getattr(__import__("PokerTrackerSummary"), "PokerTrackerSummary", None).re_Identify 

164 

165 def walkDirectory(self, dir, sitelist): 

166 """Walks a directory, and executes a callback on each file""" 

167 dir = os.path.abspath(dir) 

168 for file in [file for file in os.listdir(dir) if not file in [".", ".."]]: 

169 nfile = os.path.join(dir, file) 

170 if os.path.isdir(nfile): 

171 self.walkDirectory(nfile, sitelist) 

172 else: 

173 self.processFile(nfile) 

174 

175 def __listof(self, x): 

176 if isinstance(x, list) or isinstance(x, tuple): 

177 return x 

178 else: 

179 return [x] 

180 

181 def processFile(self, path): 

182 log.debug("process fill identify", path) 

183 if path not in self.filelist: 

184 log.debug("filelist", self.filelist) 

185 whole_file, kodec = self.read_file(path) 

186 # log.debug('whole_file',whole_file) 

187 log.debug("kodec", kodec) 

188 if whole_file: 

189 fobj = self.idSite(path, whole_file, kodec) 

190 log.debug("siteid obj") 

191 # print(fobj.path) 

192 if fobj is False: # Site id failed 

193 log.debug(("DEBUG:") + " " + ("siteId Failed for: %s") % path) 

194 else: 

195 self.filelist[path] = fobj 

196 

197 def read_file(self, in_path): 

198 # Ignore macOS-specific hidden files such as .DS_Store 

199 if in_path.endswith(".DS_Store"): 

200 log.warning(f"Skipping system file {in_path}") 

201 return None, None 

202 

203 # Excel file management if xlrd is available 

204 if (in_path.endswith(".xls") or in_path.endswith(".xlsx")) and xlrd: 

205 try: 

206 wb = xlrd.open_workbook(in_path) 

207 sh = wb.sheet_by_index(0) 

208 header = str(sh.cell(0, 0).value) 

209 return header, "utf-8" 

210 except (xlrd.XLRDError, IOError) as e: 

211 log.error(f"Error reading Excel file {in_path}: {e}") 

212 return None, None 

213 

214 # Check for the presence of a BOM for UTF-16 

215 try: 

216 with open(in_path, "rb") as infile: 

217 raw_data = infile.read() 

218 

219 # If the file begins with a UTF-16 BOM (little endian or big endian) 

220 if raw_data.startswith(b"\xff\xfe") or raw_data.startswith(b"\xfe\xff"): 

221 try: 

222 whole_file = raw_data.decode("utf-16") 

223 return whole_file, "utf-16" 

224 except UnicodeDecodeError as e: 

225 log.error(f"Error decoding UTF-16 file {in_path}: {e}") 

226 return None, None 

227 except IOError as e: 

228 log.error(f"Error reading file {in_path}: {e}") 

229 return None, None 

230 

231 # Try different encodings in the `self.codepage` list 

232 for kodec in self.codepage: 

233 try: 

234 with codecs.open(in_path, "r", kodec) as infile: 

235 whole_file = infile.read() 

236 return whole_file, kodec 

237 except (IOError, UnicodeDecodeError) as e: 

238 log.warning(f"Failed to read file {in_path} with codec {kodec}: {e}") 

239 continue 

240 

241 log.error(f"Unable to read file {in_path} with any known codecs.") 

242 return None, None 

243 

244 def idSite(self, path, whole_file, kodec): 

245 """Identifies the site the hh file originated from""" 

246 f = FPDBFile(path) 

247 f.kodec = kodec 

248 # DEBUG:print('idsite path',path ) 

249 # DEBUG:print('idsite f',f,f.ftype,f.site,f.gametype ) 

250 

251 # DEBUG:print('idsite self.sitelist.items',self.sitelist.items()) 

252 for id, site in list(self.sitelist.items()): 

253 filter_name = site.filter_name 

254 m = site.re_Identify.search(whole_file[:5000]) 

255 if m and filter_name in ("Fulltilt", "PokerStars"): 

256 m1 = re_Divider[filter_name].search(whole_file.replace("\r\n", "\n")) 

257 if m1: 

258 f.archive = True 

259 f.archiveDivider = True 

260 elif re_Head.get(filter_name) and re_Head[filter_name].match(whole_file[:5000].replace("\r\n", "\n")): 

261 f.archive = True 

262 f.archiveHead = True 

263 if m: 

264 f.site = site 

265 f.ftype = "hh" 

266 if f.site.re_HeroCards: 

267 h = f.site.re_HeroCards.search(whole_file[:5000]) 

268 if h and "PNAME" in h.groupdict(): 

269 f.hero = h.group("PNAME") 

270 else: 

271 f.hero = "Hero" 

272 return f 

273 

274 for id, site in list(self.sitelist.items()): 

275 if site.summary: 

276 if path.endswith(".xls") or path.endswith(".xlsx"): 

277 filter_name = site.filter_name 

278 if filter_name in ("Fulltilt", "PokerStars"): 

279 m2 = re_XLS[filter_name].search(whole_file[:5000]) 

280 if m2: 

281 f.site = site 

282 f.ftype = "summary" 

283 return f 

284 else: 

285 m3 = site.re_SumIdentify.search(whole_file[:10000]) 

286 if m3: 

287 f.site = site 

288 f.ftype = "summary" 

289 return f 

290 

291 m1 = self.re_Identify_PT.search(whole_file[:5000]) 

292 m2 = self.re_SumIdentify_PT.search(whole_file[:100]) 

293 if m1 or m2: 

294 filter = "PokerTrackerToFpdb" 

295 filter_name = "PokerTracker" 

296 mod = __import__(filter) 

297 obj = getattr(mod, filter_name, None) 

298 summary = "PokerTrackerSummary" 

299 f.site = Site("PokerTracker", filter, filter_name, summary, obj) 

300 if m1: 

301 f.ftype = "hh" 

302 if re.search("\*{2}\sGame\sID\s", m1.group()): 

303 f.site.line_delimiter = None 

304 f.site.re_SplitHands = re.compile("End\sof\sgame\s\d+") 

305 elif re.search("\*{2}\sHand\s\#\s", m1.group()): 

306 f.site.line_delimiter = None 

307 f.site.re_SplitHands = re.compile("Rake:\s[^\s]+") 

308 elif re.search("Server\spoker\d+\.ipoker\.com", whole_file[:250]): 

309 f.site.line_delimiter = None 

310 f.site.spaces = True 

311 f.site.re_SplitHands = re.compile("GAME\s\#") 

312 m3 = f.site.re_HeroCards1.search(whole_file[:5000]) 

313 if m3: 

314 f.hero = m3.group("PNAME") 

315 else: 

316 m4 = f.site.re_HeroCards2.search(whole_file[:5000]) 

317 if m4: 

318 f.hero = m4.group("PNAME") 

319 else: 

320 f.ftype = "summary" 

321 return f 

322 

323 return False 

324 

325 def getFilesForSite(self, sitename, ftype): 

326 files_for_site = [] 

327 for name, f in list(self.filelist.items()): 

328 if f.ftype is not None and f.site.name == sitename and f.ftype == "hh": 

329 files_for_site.append(f) 

330 return files_for_site 

331 

332 def fetchGameTypes(self): 

333 for name, f in list(self.filelist.items()): 

334 if f.ftype is not None and f.ftype == "hh": 

335 try: # TODO: this is a dirty hack. Borrowed from fpdb_import 

336 name = str(name, "utf8", "replace") 

337 except TypeError: 

338 log.error(TypeError) 

339 mod = __import__(f.site.hhc_fname) 

340 obj = getattr(mod, f.site.filter_name, None) 

341 hhc = obj(self.config, in_path=name, sitename=f.site.hhc_fname, autostart=False) 

342 if hhc.readFile(): 

343 f.gametype = hhc.determineGameType(hhc.whole_file) 

344 

345 

346def main(argv=None): 

347 if argv is None: 

348 argv = sys.argv[1:] 

349 

350 Configuration.set_logfile("fpdb-log.txt") 

351 config = Configuration.Config(file="HUD_config.test.xml") 

352 in_path = os.path.abspath("regression-test-files") 

353 IdSite = IdentifySite(config) 

354 start = time() 

355 IdSite.scan(in_path) 

356 print("duration", time() - start) 

357 

358 print("\n----------- SITE LIST -----------") 

359 for sid, site in list(IdSite.sitelist.items()): 

360 print("%2d: Name: %s HHC: %s Summary: %s" % (sid, site.name, site.filter_name, site.summary)) 

361 print("----------- END SITE LIST -----------") 

362 

363 print("\n----------- ID REGRESSION FILES -----------") 

364 count = 0 

365 for f, ffile in list(IdSite.filelist.items()): 

366 tmp = "" 

367 tmp += ": Type: %s " % ffile.ftype 

368 count += 1 

369 if ffile.ftype == "hh": 

370 tmp += "Conv: %s" % ffile.site.hhc_fname 

371 elif ffile.ftype == "summary": 

372 tmp += "Conv: %s" % ffile.site.summary 

373 print(f, tmp) 

374 print(count, "files identified") 

375 print("----------- END ID REGRESSION FILES -----------") 

376 

377 print("----------- RETRIEVE FOR SINGLE SITE -----------") 

378 IdSite.getFilesForSite("PokerStars", "hh") 

379 print("----------- END RETRIEVE FOR SINGLE SITE -----------") 

380 

381 

382if __name__ == "__main__": 

383 sys.exit(main())