Coverage for IdentifySite.py: 0%
290 statements
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-18 00:10 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2024-11-18 00:10 +0000
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
4# Copyright 2010-2011 Chaz Littlejohn
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU Affero General Public License as published by
7# the Free Software Foundation, version 3 of the License.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16# In the "official" distribution you can find the license in agpl-3.0.txt.
18from __future__ import print_function
21# import L10n
22# _ = L10n.get_translation()
24import re
25import sys
26import os
27from time import time
28import codecs
31import Configuration
32import logging
34try:
35 import xlrd
36except ImportError:
37 xlrd = None
38# logging has been set up in fpdb.py or HUD_main.py, use their settings:
39log = logging.getLogger("parser")
41re_Divider, re_Head, re_XLS = {}, {}, {}
42re_Divider["PokerStars"] = re.compile(r"^Hand #(\d+)\s*$", re.MULTILINE)
43re_Divider["Fulltilt"] = re.compile(r"\*{20}\s#\s\d+\s\*{15,25}\s?", re.MULTILINE)
44re_Head["Fulltilt"] = re.compile(r"^((BEGIN)?\n)?FullTiltPoker.+\n\nSeat", re.MULTILINE)
45re_XLS["PokerStars"] = re.compile(r"Tournaments\splayed\sby\s\'.+?\'")
46re_XLS["Fulltilt"] = re.compile(r"Player\sTournament\sReport\sfor\s.+?\s\(.*\)")
49class FPDBFile(object):
50 path = ""
51 ftype = None # Valid: hh, summary, both
52 site = None
53 kodec = None
54 archive = False
55 archiveHead = False
56 archiveDivider = False
57 gametype = False
58 hero = "-"
60 def __init__(self, path):
61 self.path = path
64class Site(object):
65 def __init__(self, name, hhc_fname, filter_name, summary, obj):
66 self.name = name
67 # FIXME: rename filter to hhc_fname
68 self.hhc_fname = hhc_fname
69 # FIXME: rename filter_name to hhc_type
70 self.filter_name = filter_name
71 self.re_SplitHands = obj.re_SplitHands
72 self.codepage = obj.codepage
73 self.copyGameHeader = obj.copyGameHeader
74 self.summaryInFile = obj.summaryInFile
75 self.re_Identify = obj.re_Identify
76 # self.obj = obj
77 if summary:
78 self.summary = summary
79 self.re_SumIdentify = getattr(__import__(summary), summary, None).re_Identify
80 else:
81 self.summary = None
82 self.line_delimiter = self.getDelimiter(filter_name)
83 self.line_addendum = self.getAddendum(filter_name)
84 self.spaces = filter_name == "Entraction"
85 self.getHeroRegex(obj, filter_name)
87 def getDelimiter(self, filter_name):
88 line_delimiter = None
89 if filter_name == "PokerStars":
90 line_delimiter = "\n\n"
91 elif filter_name == "Fulltilt" or filter_name == "PokerTracker":
92 line_delimiter = "\n\n\n"
93 elif self.re_SplitHands.match("\n\n") and filter_name != "Entraction":
94 line_delimiter = "\n\n"
95 elif self.re_SplitHands.match("\n\n\n"):
96 line_delimiter = "\n\n\n"
98 return line_delimiter
100 def getAddendum(self, filter_name):
101 line_addendum = ""
102 if filter_name == "OnGame":
103 line_addendum = "*"
104 elif filter_name == "Merge":
105 line_addendum = "<"
106 elif filter_name == "Entraction":
107 line_addendum = "\n\n"
109 return line_addendum
111 def getHeroRegex(self, obj, filter_name):
112 self.re_HeroCards = None
113 if hasattr(obj, "re_HeroCards"):
114 if filter_name not in ("Bovada", "Enet"):
115 self.re_HeroCards = obj.re_HeroCards
116 if filter_name == "PokerTracker":
117 self.re_HeroCards1 = obj.re_HeroCards1
118 self.re_HeroCards2 = obj.re_HeroCards2
121class IdentifySite(object):
122 def __init__(self, config, hhcs=None):
123 self.config = config
124 self.codepage = ("utf8", "utf-16", "cp1252", "ISO-8859-1")
125 self.sitelist = {}
126 self.filelist = {}
127 self.generateSiteList(hhcs)
129 def scan(self, path):
130 if os.path.isdir(path):
131 self.walkDirectory(path, self.sitelist)
132 else:
133 self.processFile(path)
135 def get_fobj(self, file):
136 try:
137 fobj = self.filelist[file]
138 except KeyError:
139 return False
140 return fobj
142 def get_filelist(self):
143 return self.filelist
145 def clear_filelist(self):
146 self.filelist = {}
148 def generateSiteList(self, hhcs):
149 """Generates a ordered dictionary of site, filter and filter name for each site in hhcs"""
150 if not hhcs:
151 hhcs = self.config.hhcs
152 for site, hhc in list(hhcs.items()):
153 filter = hhc.converter
154 filter_name = filter.replace("ToFpdb", "")
155 summary = hhc.summaryImporter
156 mod = __import__(filter)
157 obj = getattr(mod, filter_name, None)
158 try:
159 self.sitelist[obj.siteId] = Site(site, filter, filter_name, summary, obj)
160 except Exception as e:
161 log.error("Failed to load HH importer: %s. %s" % (filter_name, e))
162 self.re_Identify_PT = getattr(__import__("PokerTrackerToFpdb"), "PokerTracker", None).re_Identify
163 self.re_SumIdentify_PT = getattr(__import__("PokerTrackerSummary"), "PokerTrackerSummary", None).re_Identify
165 def walkDirectory(self, dir, sitelist):
166 """Walks a directory, and executes a callback on each file"""
167 dir = os.path.abspath(dir)
168 for file in [file for file in os.listdir(dir) if not file in [".", ".."]]:
169 nfile = os.path.join(dir, file)
170 if os.path.isdir(nfile):
171 self.walkDirectory(nfile, sitelist)
172 else:
173 self.processFile(nfile)
175 def __listof(self, x):
176 if isinstance(x, list) or isinstance(x, tuple):
177 return x
178 else:
179 return [x]
181 def processFile(self, path):
182 log.debug("process fill identify", path)
183 if path not in self.filelist:
184 log.debug("filelist", self.filelist)
185 whole_file, kodec = self.read_file(path)
186 # log.debug('whole_file',whole_file)
187 log.debug("kodec", kodec)
188 if whole_file:
189 fobj = self.idSite(path, whole_file, kodec)
190 log.debug("siteid obj")
191 # print(fobj.path)
192 if fobj is False: # Site id failed
193 log.debug(("DEBUG:") + " " + ("siteId Failed for: %s") % path)
194 else:
195 self.filelist[path] = fobj
197 def read_file(self, in_path):
198 # Ignore macOS-specific hidden files such as .DS_Store
199 if in_path.endswith(".DS_Store"):
200 log.warning(f"Skipping system file {in_path}")
201 return None, None
203 # Excel file management if xlrd is available
204 if (in_path.endswith(".xls") or in_path.endswith(".xlsx")) and xlrd:
205 try:
206 wb = xlrd.open_workbook(in_path)
207 sh = wb.sheet_by_index(0)
208 header = str(sh.cell(0, 0).value)
209 return header, "utf-8"
210 except (xlrd.XLRDError, IOError) as e:
211 log.error(f"Error reading Excel file {in_path}: {e}")
212 return None, None
214 # Check for the presence of a BOM for UTF-16
215 try:
216 with open(in_path, "rb") as infile:
217 raw_data = infile.read()
219 # If the file begins with a UTF-16 BOM (little endian or big endian)
220 if raw_data.startswith(b"\xff\xfe") or raw_data.startswith(b"\xfe\xff"):
221 try:
222 whole_file = raw_data.decode("utf-16")
223 return whole_file, "utf-16"
224 except UnicodeDecodeError as e:
225 log.error(f"Error decoding UTF-16 file {in_path}: {e}")
226 return None, None
227 except IOError as e:
228 log.error(f"Error reading file {in_path}: {e}")
229 return None, None
231 # Try different encodings in the `self.codepage` list
232 for kodec in self.codepage:
233 try:
234 with codecs.open(in_path, "r", kodec) as infile:
235 whole_file = infile.read()
236 return whole_file, kodec
237 except (IOError, UnicodeDecodeError) as e:
238 log.warning(f"Failed to read file {in_path} with codec {kodec}: {e}")
239 continue
241 log.error(f"Unable to read file {in_path} with any known codecs.")
242 return None, None
244 def idSite(self, path, whole_file, kodec):
245 """Identifies the site the hh file originated from"""
246 f = FPDBFile(path)
247 f.kodec = kodec
248 # DEBUG:print('idsite path',path )
249 # DEBUG:print('idsite f',f,f.ftype,f.site,f.gametype )
251 # DEBUG:print('idsite self.sitelist.items',self.sitelist.items())
252 for id, site in list(self.sitelist.items()):
253 filter_name = site.filter_name
254 m = site.re_Identify.search(whole_file[:5000])
255 if m and filter_name in ("Fulltilt", "PokerStars"):
256 m1 = re_Divider[filter_name].search(whole_file.replace("\r\n", "\n"))
257 if m1:
258 f.archive = True
259 f.archiveDivider = True
260 elif re_Head.get(filter_name) and re_Head[filter_name].match(whole_file[:5000].replace("\r\n", "\n")):
261 f.archive = True
262 f.archiveHead = True
263 if m:
264 f.site = site
265 f.ftype = "hh"
266 if f.site.re_HeroCards:
267 h = f.site.re_HeroCards.search(whole_file[:5000])
268 if h and "PNAME" in h.groupdict():
269 f.hero = h.group("PNAME")
270 else:
271 f.hero = "Hero"
272 return f
274 for id, site in list(self.sitelist.items()):
275 if site.summary:
276 if path.endswith(".xls") or path.endswith(".xlsx"):
277 filter_name = site.filter_name
278 if filter_name in ("Fulltilt", "PokerStars"):
279 m2 = re_XLS[filter_name].search(whole_file[:5000])
280 if m2:
281 f.site = site
282 f.ftype = "summary"
283 return f
284 else:
285 m3 = site.re_SumIdentify.search(whole_file[:10000])
286 if m3:
287 f.site = site
288 f.ftype = "summary"
289 return f
291 m1 = self.re_Identify_PT.search(whole_file[:5000])
292 m2 = self.re_SumIdentify_PT.search(whole_file[:100])
293 if m1 or m2:
294 filter = "PokerTrackerToFpdb"
295 filter_name = "PokerTracker"
296 mod = __import__(filter)
297 obj = getattr(mod, filter_name, None)
298 summary = "PokerTrackerSummary"
299 f.site = Site("PokerTracker", filter, filter_name, summary, obj)
300 if m1:
301 f.ftype = "hh"
302 if re.search("\*{2}\sGame\sID\s", m1.group()):
303 f.site.line_delimiter = None
304 f.site.re_SplitHands = re.compile("End\sof\sgame\s\d+")
305 elif re.search("\*{2}\sHand\s\#\s", m1.group()):
306 f.site.line_delimiter = None
307 f.site.re_SplitHands = re.compile("Rake:\s[^\s]+")
308 elif re.search("Server\spoker\d+\.ipoker\.com", whole_file[:250]):
309 f.site.line_delimiter = None
310 f.site.spaces = True
311 f.site.re_SplitHands = re.compile("GAME\s\#")
312 m3 = f.site.re_HeroCards1.search(whole_file[:5000])
313 if m3:
314 f.hero = m3.group("PNAME")
315 else:
316 m4 = f.site.re_HeroCards2.search(whole_file[:5000])
317 if m4:
318 f.hero = m4.group("PNAME")
319 else:
320 f.ftype = "summary"
321 return f
323 return False
325 def getFilesForSite(self, sitename, ftype):
326 files_for_site = []
327 for name, f in list(self.filelist.items()):
328 if f.ftype is not None and f.site.name == sitename and f.ftype == "hh":
329 files_for_site.append(f)
330 return files_for_site
332 def fetchGameTypes(self):
333 for name, f in list(self.filelist.items()):
334 if f.ftype is not None and f.ftype == "hh":
335 try: # TODO: this is a dirty hack. Borrowed from fpdb_import
336 name = str(name, "utf8", "replace")
337 except TypeError:
338 log.error(TypeError)
339 mod = __import__(f.site.hhc_fname)
340 obj = getattr(mod, f.site.filter_name, None)
341 hhc = obj(self.config, in_path=name, sitename=f.site.hhc_fname, autostart=False)
342 if hhc.readFile():
343 f.gametype = hhc.determineGameType(hhc.whole_file)
346def main(argv=None):
347 if argv is None:
348 argv = sys.argv[1:]
350 Configuration.set_logfile("fpdb-log.txt")
351 config = Configuration.Config(file="HUD_config.test.xml")
352 in_path = os.path.abspath("regression-test-files")
353 IdSite = IdentifySite(config)
354 start = time()
355 IdSite.scan(in_path)
356 print("duration", time() - start)
358 print("\n----------- SITE LIST -----------")
359 for sid, site in list(IdSite.sitelist.items()):
360 print("%2d: Name: %s HHC: %s Summary: %s" % (sid, site.name, site.filter_name, site.summary))
361 print("----------- END SITE LIST -----------")
363 print("\n----------- ID REGRESSION FILES -----------")
364 count = 0
365 for f, ffile in list(IdSite.filelist.items()):
366 tmp = ""
367 tmp += ": Type: %s " % ffile.ftype
368 count += 1
369 if ffile.ftype == "hh":
370 tmp += "Conv: %s" % ffile.site.hhc_fname
371 elif ffile.ftype == "summary":
372 tmp += "Conv: %s" % ffile.site.summary
373 print(f, tmp)
374 print(count, "files identified")
375 print("----------- END ID REGRESSION FILES -----------")
377 print("----------- RETRIEVE FOR SINGLE SITE -----------")
378 IdSite.getFilesForSite("PokerStars", "hh")
379 print("----------- END RETRIEVE FOR SINGLE SITE -----------")
382if __name__ == "__main__":
383 sys.exit(main())