Skip to content
Snippets Groups Projects
Commit ed4197e0 authored by Anton Sarukhanov's avatar Anton Sarukhanov
Browse files

Cleanup & refactoring.

parent fc42aa1a
No related branches found
No related tags found
No related merge requests found
[flake8]
max-line-length = 88
FLASK_APP=app.py
venv
__pycache__
.mypy_cache
image: python:3-buster
before_script:
- pip install -r requirements-dev.txt
test:
script:
- bandit *.py
- flake8 *.py
- mypy *.py
- pydocstyle *.py
- pylint *.py
......@@ -10,10 +10,11 @@ This is a quick weekend hack, very much not "production-ready". Use at your own
## Setup
* Install the `libxml2-dev` and `libxslt1-dev` libraries.
* Create a Python 3 [virtualenv](https://virtualenv.pypa.io/en/stable/)
* `pip install -r requirements.txt` to install Python dependencies
## Run
* `FLASK_APP=app.py flask run -h 0.0.0.0`
* `flask run`
* Visit [localhost:5000](http://127.0.0.1:5000) in a browser.
"""Main entrypoint for the Flask application."""
from flask import Flask, render_template, redirect, request, url_for
from flask_caching import Cache
from scraper import Scraper
from scraper import FTPScraper
import models
# pylint: disable=invalid-name
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
def make_cache_key(*args, **kwargs):
path = request.path
args = str(hash(frozenset(request.args.items())))
return (path + args).encode('utf-8')
DISPLAY_DATETIME_FORMAT = '%A, %B %d, %Y at %-I:%M %p'
DISPLAY_DATE_FORMAT = '%A, %B %-d, %Y'
DISPLAY_TIME_FORMAT = '%-I:%M %p on %A'
@app.after_request
def add_header(response):
"""Add an HTTP response header for cache invalidation."""
response.cache_control.max_age = 300
return response
@app.template_filter('strftime')
def _jinja2_filter_datetime(datetime, date=True, time=True):
return datetime.strftime(DISPLAY_DATETIME_FORMAT if date and time
else DISPLAY_DATE_FORMAT if date
else DISPLAY_TIME_FORMAT if time else '')
@app.route("/")
def index():
"""Render the app landing page."""
return render_template('index.html')
def _make_cache_key():
"""Create a cache key for Flask-Caching."""
path = request.path
args = str(hash(frozenset(request.args.items())))
return (path + args).encode('utf-8')
@app.route("/live")
@cache.cached(timeout=300, key_prefix=make_cache_key)
@cache.cached(timeout=300, key_prefix=_make_cache_key)
def live(results_url=None):
"""Render the primary view of live tournament stats."""
results_url = request.args.get('results_url')
if not results_url:
return redirect(url_for('index'))
scraper = Scraper(results_url)
scraper = FTPScraper(results_url)
tournament = scraper.scrape()
return render_template('live.html',
tournament=tournament,
events=tournament.events)
return render_template('live.html', tournament=tournament, events=tournament.events,
phases=models.EventPhase)
if __name__ == "__main__":
......
"""Domain-specific class definitions."""
from collections import namedtuple
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
from enum import Enum
Fencer = namedtuple('Fencer', 'name is_checked_in')
setattr(Fencer, '__eq__', lambda f1, f2: f1.name == f2.name)
@dataclass
class Tournament:
def __init__(self, name, url, updated='', events=None):
self.name = name
self.url = url
self.updated = updated
self.events = events or []
"""A fencing competition with live results powered by Fencing Time."""
def add_event(self, event):
self.events.append(event)
event.tournament = self
name: str
url: str
updated: datetime
events: List['Event'] = field(default_factory=list)
def count_all_fencers(self):
def count_fencers(self):
"""Count the fencers in all events."""
for event in self.events:
event.count_fencers()
class EventStatus:
def __init__(self, name):
self.name = name
class EventPhase(Enum):
"""The current state of an Event."""
def __repr__(self):
return self.name
def __str__(self):
"""Provide the readable name when used as a string."""
return str(self.name)
REGISTRATION = "Registration"
STARTED = "Started"
FINISHED = "Finished"
@dataclass
class EventStatistics:
"""Fencer counts for an event."""
fencers_checked_in: List[Fencer] = field(default_factory=list)
new_fencers_not_checked_in: List[Fencer] = field(default_factory=list)
previously_fenced: dict = field(default_factory=dict)
previous_total: int = 0
@dataclass
class Event:
STATUS_REGISTRATION = EventStatus("Registration")
STATUS_STARTED = EventStatus("Started")
STATUS_FINISHED = EventStatus("Finished")
def __init__(self, name, time, status, url, fencers, tournament=None):
self.name = name
self.time = time
self.status = status
self.url = url
self.fencers = fencers
self.tournament = tournament
"""A single event in a tournament (e.g. Y12 Mens Foil)."""
name: str
url: str
time: datetime
tournament: Tournament
phase: EventPhase
stats: EventStatistics = field(default_factory=EventStatistics)
fencers: List[Fencer] = field(default_factory=list)
def __repr__(self):
"""Use event name as the canonical representation."""
return self.name
def count_fencers(self):
"""Count the fencers in an event.
Result is divided by status (checked in, not checked in, etc.)"""
self.fencers_checked_in = []
self.new_fencers_not_checked_in = []
self.previously_fenced = {}
self.previous_total = 0
if self.name == 'Y-12 Men\'s Foil':
print(self.fencers)
Result is divided by status (checked in, not checked in, etc.)
"""
self.stats.fencers_checked_in = []
self.stats.new_fencers_not_checked_in = []
self.stats.previously_fenced = {}
self.stats.previous_total = 0
for fencer in self.fencers:
if fencer.is_checked_in:
self.fencers_checked_in.append(fencer)
self.stats.fencers_checked_in.append(fencer)
else:
self.new_fencers_not_checked_in.append(fencer)
self.stats.new_fencers_not_checked_in.append(fencer)
for event in self.tournament.events:
if event.name == self.name:
break
if fencer in event.fencers:
if event.name in self.previously_fenced:
self.previously_fenced[event.name] += 1
if event.name in self.stats.previously_fenced:
self.stats.previously_fenced[event.name] += 1
else:
self.previously_fenced[event.name] = 1
self.previous_total += 1
self.stats.previously_fenced[event.name] = 1
self.stats.previous_total += 1
try:
self.new_fencers_not_checked_in.remove(fencer)
self.stats.new_fencers_not_checked_in.remove(fencer)
except ValueError:
pass # already removed; ignore
break
class Fencer:
def __init__(self, name, is_checked_in):
self.name = name.strip()
self.is_checked_in = is_checked_in
def __repr__(self):
return '<Fencer name="{}" is_checked_in="{}">'.format(
self.name, self.is_checked_in)
def __eq__(self, other):
return self.name == other.name
[mypy]
# There's no mypy stub for these libs. I want to use them, but don't feel like creating the stubs.
[mypy-flask_caching.*]
ignore_missing_imports = True
[mypy-lxml.*]
ignore_missing_imports = True
-r requirements.txt
bandit==1.6.2
flake8==3.7.9
mypy==0.750
pydocstyle==4.0.1
pylint==2.4.4
python-dotenv==0.10.3
lxml==3.6.4
requests==2.12.1
Flask==0.11.1
Flask-Caching==1.3.3
lxml==4.4.2
requests==2.22.0
Flask==1.1.1
Flask-Caching==1.8.0
from lxml import html
import requests
"""Extract fencing event registration stats from Fencing Time webpages."""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from urllib.parse import urlparse, urljoin
from lxml import html # nosec Bandit suggests defusedxml but defusedxml.lxml is dead
import requests
from models import Event, EventPhase, Fencer, Tournament
from models import Event, Fencer, Tournament
class FTPScraper:
"""Scraper for tournaments hosted on an FTP server.
This reads the original Fencing Time results pages, hosted by individual
tournament operators.
"""
UPDATED_DATETIME_FORMAT = '%m/%d/%Y - %I:%M %p'
EVENT_DATETIME_FORMAT = '%A, %B %d, %Y - %I:%M %p'
class Scraper:
def __init__(self, tournament_url):
"""Set up the scraper instance."""
self.tournament_url = tournament_url
self.tournament = None
def scrape(self):
# Get tournament info
"""Get all tournament information."""
try:
results = requests.get(self.tournament_url)
except requests.exceptions.MissingSchema:
results = requests.get("http://{}".format(self.tournament_url))
results_tree = html.fromstring(results.content)
try:
tournament_name = results_tree.xpath(
'//span[@class="tournName"]/text()')[0]
updated = (results_tree.xpath(
'//span[@class="lastUpdate"]/text()')[0]
.replace('Last Updated:', '').strip())
tournament_name = results_tree.xpath('//span[@class="tournName"]/text()')[0]
updated_str = (results_tree.xpath('//span[@class="lastUpdate"]/text()')[0]
.replace('Last Updated:', '').strip())
updated = datetime.strptime(updated_str, self.UPDATED_DATETIME_FORMAT)
except IndexError:
raise ScrapeError("Tournament info not found.")
self.tournament = Tournament(tournament_name, results.url, updated)
self.tournament = Tournament(name=tournament_name, url=results.url,
updated=updated)
# Get tournament events
try:
......@@ -38,12 +50,12 @@ class Scraper:
raise ScrapeError("No event schedule found.")
loop = asyncio.new_event_loop()
loop.run_until_complete(self.scrape_events(event_urls))
loop.run_until_complete(self._scrape_events(event_urls))
return self.tournament
async def scrape_events(self, event_urls):
async def _scrape_events(self, event_urls):
"""Get event information asynchronously."""
with ThreadPoolExecutor(max_workers=20) as executor:
loop = asyncio.get_event_loop()
futures = []
......@@ -51,54 +63,56 @@ class Scraper:
for event_url in event_urls:
if not urlparse(event_url).netloc:
event_url = urljoin(self.tournament.url, event_url)
futures.append(loop.run_in_executor(
executor,
requests.get,
event_url))
futures.append(loop.run_in_executor(executor, requests.get, event_url))
for response in await asyncio.gather(*futures):
event = self.parse_event(response)
self.tournament.add_event(event)
self.tournament.events.append(event)
self.tournament.count_all_fencers()
self.tournament.count_fencers()
def parse_event(self, event):
# Get the event details (name, time) as text
"""Extract useful strings from the event info."""
event_tree = html.fromstring(event.content)
event_details = event_tree.xpath(
'//span[@class="tournDetails"]/text()')
event_details = event_tree.xpath('//span[@class="tournDetails"]/text()')
try:
event_name = event_details[0]
event_time = event_details[1]
event_time_str = event_details[1]
event_time = datetime.strptime(event_time_str, self.EVENT_DATETIME_FORMAT)
except IndexError:
raise ScrapeError(
"Failed to interpret live results for event \"{}\"."
.format(event_details))
raise ScrapeError("Failed to interpret live results for event \"{}\"."
.format(event_details))
# Get the event status
if event_tree.xpath('//a[text()="Final Results"]'):
fencers = event_tree.xpath(
'//div[@id="finalResults"]/table/tr/td[2]/text()')
fencers = [Fencer(f, True) for f in fencers]
event_status = Event.STATUS_FINISHED
event_phase = EventPhase.FINISHED
elif event_tree.xpath('//a[text()="Seeding"]'):
fencers = event_tree.xpath(
'//div[@id="Round1Seeding"]/table/tr/td[2]/text()')
fencers = [Fencer(f, True) for f in fencers]
event_status = Event.STATUS_STARTED
event_phase = EventPhase.STARTED
elif event_tree.xpath('//a[text()="Check-In Status"]'):
event_status = Event.STATUS_REGISTRATION
event_phase = EventPhase.REGISTRATION
fencers_checked_in = [
True if len(list(f)) else False
for f in event_tree.xpath(
'//div[@id="checkIn"]/table/tr/td[1]')]
fencers = event_tree.xpath(
'//div[@id="checkIn"]/table/tr/td[2]/text()')
fencers = [Fencer(f, ci)
for (f, ci) in zip(fencers, fencers_checked_in)]
bool(list(f))
for f in event_tree.xpath('//div[@id="checkIn"]/table/tr/td[1]')]
fencers = event_tree.xpath('//div[@id="checkIn"]/table/tr/td[2]/text()')
fencers = [Fencer(f, ci) for (f, ci) in zip(fencers, fencers_checked_in)]
return Event(name=event_name, time=event_time, phase=event_phase,
url=event.url, fencers=fencers, tournament=self.tournament)
class FTLiveScraper(FTPScraper):
"""Scraper for tournaments hosted on fencingtimelive.com.
return Event(event_name, event_time, event_status, event.url, fencers)
This reads the newer-style pages, centrally hosted by Fencing Time.
"""
# to do...
class ScrapeError(Exception):
pass
"""A problem with scraping."""
......@@ -14,9 +14,6 @@ div.time {
div.name {
font-size: 1.1em;
}
div.time {
font-size: .80em;
}
section {
border-radius: 1em;
border: 2px solid #555;
......@@ -50,8 +47,8 @@ a:hover {
font-size: 1.15em;
}
/* Event statuses */
.status {
/* Event phases */
.phase {
float: right;
color: #000;
border-radius: .25em;
......@@ -63,13 +60,13 @@ a:hover {
vertical-align: top;
text-transform: uppercase;
}
.status-registration .status {
.phase-registration .phase {
background-color: #2f2;
}
.status-started .status {
.phase-started .phase {
background-color: #fc0;
}
.status-finished .status {
.phase-finished .phase {
background-color: #f22;
}
......@@ -107,6 +104,13 @@ body.page-live main {
max-width: 100%;
box-sizing: border-box;
}
body.page-live main > header {
padding: 0 .5em;
flex-basis: 100%;
}
body.page-live main > section {
margin: .5em;
}
body.page-live p, ul {
margin: .25em 0;
}
......
......@@ -10,37 +10,40 @@
<img class="ext-link" src="{{ url_for('static', filename='images/font-awesome/external-link-alt.svg') }}?t=20180415"></a></h1>
</header>
<main>
{% for e in events %}
<section class="status-{{ e['status']|lower }}">
{% for e in events | sort(attribute='time') %}
{% if loop.changed(e.time.date()) %}
<header><h2>{{ e.time | strftime(time=False) }}</h2></header>
{% endif %}
<section class="phase-{{ e.phase|lower }}">
<header>
<div class="status">{{ e['status']|lower }}</div>
<div class="phase">{{ e.phase|lower }}</div>
<div class="event-actions">
<a href="{{ e.url }}" target="_blank">
<img class="ext-link" src="{{ url_for('static', filename='images/font-awesome/external-link-alt.svg') }}?t=20180415"></a>
</div>
<div class="name">
<a name="{{ e['name'] }}" href="#{{ e['name'] }}">{{ e['name'] }}</a>
<a name="{{ e.name }}" href="#{{ e.name }}">{{ e.name }}</a>
</div>
<div class="time">{{ e['time'] }}</div>
<div class="time">{{ e.time | strftime(date=False) }}</div>
</header>
<p><span class="number">{{ e['fencers_checked_in'] | length }}</span> of <span class="number">{{ e['fencers'] | length }}</span> fencer(s) checked in.</p>
{% if e['previously_fenced'] %}
<p><span class="number">{{ e['previous_total'] }}</span> fenced in prior events:</p>
<p><span class="number">{{ e.stats.fencers_checked_in | length }}</span> of <span class="number">{{ e.fencers | length }}</span> fencer(s) checked in.</p>
{% if e.stats.previously_fenced %}
<p><span class="number">{{ e.stats.previous_total }}</span> fenced in prior events:</p>
<ul>
{% for pe in e['previously_fenced'] %}
<li>{{ pe }} - {{ e['previously_fenced'][pe] }}</li>
{% for pe in e.stats.previously_fenced %}
<li>{{ pe }} - {{ e.stats.previously_fenced[pe] }}</li>
{% endfor %}
</ul>
{% endif %}
{% if e['new_fencers_not_checked_in'] %}
<p class="hl">{{ e['new_fencers_not_checked_in'] | length }} new and not checked in</p>
{% if e.stats.new_fencers_not_checked_in %}
<p class="hl">{{ e.stats.new_fencers_not_checked_in | length }} new and not checked in</p>
{% endif %}
</section>
{% endfor %}
</main>
<footer>
{% if tournament.updated %}
<div class="updated">Last updated: <span class="date">{{ tournament.updated }}</span></div>
<div class="updated">Last updated: <span class="date">{{ tournament.updated | strftime }}</span></div>
{% endif %}
</footer>
{% endblock content %}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment