Add parser for Pennsylvania

main
Eric Ihli 1 year ago
commit fdafaae267

2
.gitignore vendored

@ -0,0 +1,2 @@
*.egg-info/
*.pyc

@ -0,0 +1,12 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Parser for Pennsylvania.

@ -0,0 +1,15 @@
##
# Lottery Data Scraper
#
# @file
# @version 0.1
FORCE:
test: FORCE
python3 -m unittest discover tests
style: FORCE
black .
# end

@ -0,0 +1,101 @@
# Parsing of lottery websites
## Demo
The following script should put you in a state where the last line will make a
bunch of requests to the Pennsylvania lottery website, parse the tables of
games/prizes, and print to your terminal a JSON structure of all of the games.
``` sh
git clone https://github.com/owogawc/lottery_data_scraper
cd lottery_data_scraper
python3 -m venv ~/.virtualenvs/lottery_data_scraper
. ~/.virtualenvs/lottery_data_scraper
pip3 install -e .
PY_LOG_LVL=DEBUG USE_CACHE=true python3 -m lottery_data_scraper.pennsylvania
```
If you have [jq](https://stedolan.github.io/jq/) installed, you can get some
formatted output by piping it to `jq` (and redirecting STDERR to /dev/null).
``` sh
PY_LOG_LVL=DEBUG USE_CACHE=true python3 -m lottery_data_scraper.pennsylvania 2> /dev/null | jq
```
## Data models
We're using [`marshmallow`](https://marshmallow.readthedocs.io/en/stable/index.html) to validate and serialize data.
I'm including the schemas here just so you can quickly get a general idea of
what data fields we're able to scrape from most lottery websites. What you see
in this README might not be up-to-date with what's in
[schemas.py](./lottery_data_scraper/schemas.py).
As of 2023-04-07 the schemas are a work-in-progress. The remaining TODO is to
determine and specify which fields are absolutely required and which are
optional.
### Game Schema
``` python
class GameSchema(Schema):
class Meta:
render_module = json
id = fields.Integer()
created_at = fields.DateTime(load_default=datetime.utcnow)
game_id = fields.Str()
name = fields.Str()
description = fields.Str()
image_urls = fields.Function(
lambda x: json.loads(x.image_urls) if x.image_urls else [],
deserialize=lambda x: None if x.image_urls == [] else json.dumps(x.image_urls),
)
how_to_play = fields.Str()
num_tx_initial = fields.Integer()
price = fields.Number()
prizes = fields.Nested(PrizeSchema, many=True)
state = fields.Str()
updated_at = fields.DateTime()
url = fields.Str()
```
### Prize Schema
``` python
class PrizeSchema(Schema):
class Meta:
render_module = json
id = fields.Integer()
game_id = fields.Integer()
available = fields.Integer()
claimed = fields.Integer()
created_at = fields.DateTime(load_default=datetime.utcnow)
value = fields.Number()
prize = fields.Str()
```
# Tests
Testing is kind of tricky because you can't rely on _just_ python with its
`requests` library. Some states have some scrape protections that require you
actually run JavaScript. Some states have extreme scrape protection that require
you to actually run a _display_. They check for some rendering context that
doesn't exist when you run a headless browser in Selenium. To scrape those
sites, you actually have to run a [X virtual
framebuffer](https://en.wikipedia.org/wiki/Xvfb). Testing in these cases isn't
as simple as running `python3 -m unittest discover`.
# Contributing
``` sh
git clone https://github.com/owogawc/lottery_data_scraper
cd lottery_data_scraper
python3 -m venv ~/.virtualenvs/lottery_data_scraper
. ~/.virtualenvs/lottery_data_scraper
pip3 install -e .
```
Then you should be able to run `make test` and see the tests pass.

@ -0,0 +1,12 @@
# TODO
- [ ] Add more states.
- [ ] California
- [ ] Georgia
- [ ] Louisiana
- [ ] Florida
- [ ] Texas
- [ ] New Mexico
- [ ] [Publish to PYPI](https://packaging.python.org/en/latest/tutorials/packaging-projects/).
- [ ] Do we still need Xvfb? Which states have that level of scrape protection?
- [ ] Decide on and add a license.

@ -0,0 +1,40 @@
"""
Configure logging for the entire package.
You can specify a log level with the environment variable
PY_LOG_LVL=[debug|info|warning|error|critical]
"""
import logging
import logging.config
import os
# Prefix the basic format with a timestamp, file pathname, and line number.
# See: https://docs.python.org/3/library/logging.html#logrecord-attributes
LOG_FORMAT = "%(asctime)s %(pathname)s %(lineno)s {}".format(logging.BASIC_FORMAT)
log_level = getattr(logging, os.environ.get("PY_LOG_LVL", "WARNING").upper())
logging_config = {
"version": 1,
"formatters": {
"standard": {
"format": LOG_FORMAT,
},
},
"handlers": {
"default": {
"level": log_level,
"formatter": "standard",
"class": "logging.StreamHandler",
},
},
"loggers": {
"": {
"handlers": ["default"],
"level": log_level,
"propagate": True,
},
},
}
logging.config.dictConfig(logging_config)

@ -0,0 +1,385 @@
"""
Scrapes the Pennsylvania lottery website for scratch-off ticket
data and calculates the expected value for each game.
Pennsylvania publishes the number of tickets printed and how many
tickets are printed at each prize level.
We can calculated the expected value of a game by summing
the value of all the prizes and dividing that by the cost
of all the tickets.
The palottery website has an "index" page that has links to every game.
Each individual game has a link to a "game rules" page.
We can start at the index and visit every game rules page, then we
can find the html table on that page which has the detailed prize
information and run our calculations.
Website that we'll be scraping:
https://www.palottery.state.pa.us/Scratch-Offs/Active-Games.aspx
Example usage:
python -m pennsylvania
Or:
LOGLEVEL=DEBUG USE_CACHE=True python -m pennsylvania
The following behavior is configurable through shell environment variables.
Set LOGLEVEL to print useful debug info to console.
LOGLEVEL=[DEBUG,INFO,WARNING,ERROR,CRITICAL]
Defaults to WARNING.
Set USE_CACHE to cache responses. This speeds up development
and is nice to the servers we're hitting.
USE_CACHE=[True]
Defaults to False. Note: Setting this env variable to the string False
will cause it to use cache because the string "False" evaluates to Truthy.
Either set it to True or don't set it.
"""
import base64
import sys
import traceback
from copy import deepcopy
import locale
import logging
import os
import re
from tempfile import gettempdir
from bs4 import BeautifulSoup as bs
import requests
from lottery_data_scraper.schemas import GameSchema
logger = logging.getLogger(__name__)
locale.setlocale(locale.LC_MONETARY, "en_US.UTF-8")
# It's worth assigning to constants values that are used in many
# places throughout a script.
BASE_URL = "https://www.palottery.state.pa.us"
INDEX_URL = f"{BASE_URL}/Scratch-Offs/Active-Games.aspx"
def fetch_html(url):
"""
Helper to fetch and cache html responses.
During development and while testing, we'll be hitting the same urls often.
The content of the pages probably won't be changing.
Caching the results will speed up development,
and the servers will appreciate us for not spamming requests.
The responses are cached in the operating systems tempfile directory.
That's probably /tmp/ or /var/tmp/ on Unix flavors and C:/temp/ on Windows.
The filename is based on the URL. But since the URL might contain
characters that are invalid for filenames, we base64 encode the URL.
"""
safe_filename = base64.urlsafe_b64encode(bytes(url, "utf-8")).decode("utf-8")
filepath = os.path.join(gettempdir(), safe_filename)
if os.path.isfile(filepath) and os.environ.get("USE_CACHE", False):
with open(filepath, "r") as f:
return f.read()
else:
# We are relying on the outside world when we make a request, so we
# might want to wrap this in a try/except. But we'd
# only want to do that in two cases.
#
# 1. We have a way of handling exceptions,
# A good example would be to catch exceptions and retry the
# request; maybe the network was down.
#
# 2. We can't handle the exception, but we want to log something
# more useful than the stack trace that will get spit out if
# we just let the exception go uncaught.
#
# In this case, I don't think it's worth muddying up the code
# trying to handle exceptions here. It's easy enough to just re-run
# the script.
html = requests.get(url).text
if os.environ.get("USE_CACHE", False):
with open(filepath, "w+") as f:
f.write(html)
return html
def find_game_names(html):
"""
Game names can be found on the index page
in the text of anchor elements
which have the class "activeGame_li".
"""
soup = bs(html, "lxml")
game_elements = soup.find_all("a", class_="activeGame_li")
return [
re.sub(r"\s+", " ", g.find("div", class_="info").text) for g in game_elements
]
def find_game_urls(html):
"""
Luckily, all of the Pennsylvania games are listed on a single html page.
We don't have to mess around with any pagination and making multiple requests.
The links are "href" attributes of anchor tags with the class "activeGame_li".
"""
soup = bs(html, "lxml")
game_elements = soup.find_all("a", class_="activeGame_li")
return ["{}{}".format(BASE_URL, e.attrs["href"]) for e in game_elements]
def find_complete_game_rules_url(html):
"""
Game pages have a link to the complete game rules.
The complete game rules have a table of all prizes for a game.
The link to the game rules page is in an anchor tag
nested under a div with the class "instant-games-games-info".
"""
soup = bs(html, "lxml")
games_info_div = soup.find("div", class_="instant-games-games-info")
games_info_anchor = games_info_div.find_all("a")[1]
games_info_url = games_info_anchor.attrs["href"]
return games_info_url
def find_rows(html):
"""
From a game rules page, find the rows of the table
that have the number of tickets and the value of each prize.
"""
soup = bs(html, "lxml")
# Some game rules pages have multiple tables.
# The first table has the prizes.
# soup.find returns the first matching element
# soup.find_all returns a list of all matching elements.
prize_table = soup.find("table")
row_elements = prize_table.find_all("tr")
# The first row is headers so we sort of want
# to skip it for the calculations, but it includes
# an important bit of information that we want.
# The rows only contain winning ticket info.
# We also care about a row for the losing prize tier.
# It will have a value of "0" but we want to know
# how many losing tickets there are.
#
# We can calculate that from the first header. It
# contains the total number of tickets printed.
# Let's get the total number of tickets printed so
# we can subtract the sum of the number of winning
# giving us the number of losing tickets.
header_row = row_elements[0]
header_columns = header_row.find_all("th")
total_number_tickets = int(re.sub(r"\D", "", header_columns[-1].text))
row_elements = row_elements[1:]
# We only care about the last and second to last columns.
# The following helper functions will help us parse
# the data we care about from each row.
#
# The last column is the number of tickets at this prize level.
# The number of tickets has commas, like 1,350,500.
# We'll have to parse them out.
#
# The second to last column is the prize value.
# Prize value is usually "$" followed by a number.
# Those are easy to parse.
# But for the free ticket prize it's "FREE $1 TICKET"
def parse_value(row_element):
columns = row_element.find_all("td")
try:
value_element = columns[-3]
value_text = value_element.text
return int(re.sub(r"\D", "", value_text))
except Exception:
# This is an exception we can handle.
# We can simply return a value of 0 if
# the row doesn't have what we expect.
# Our result might be inaccurate, but
# I'll consider that acceptable.
# I'll log something useful so I know
# to look into it.
logger.warning("Exception parsing value for a row :%s", row_element.text)
return 0
def parse_num_tickets(row_element):
columns = row_element.find_all("td")
try:
num_tickets_element = columns[-1]
num_tickets_text = num_tickets_element.text
return int(num_tickets_text.replace(",", ""))
except:
# Same as above, we can handle this.
# Logging and returning 0 is better than blowing up.
logger.warning(
"Exception parsing num_tickets for a row.\n{}".format(row_element.text)
)
return 0
# Iterate over each row and parse out the value of the prize tier
# and the number of remaining tickets at that prize tier.
rows = [(parse_value(e), parse_num_tickets(e)) for e in row_elements]
number_winning_tickets = sum(r[1] for r in rows)
# Insert the losing ticket value, $0, and the number
# of losing tickets into our rows.
rows.insert(0, (0, total_number_tickets - number_winning_tickets))
return rows
def find_price(html):
"""
Price is hard to find. It seems to always be a sibling to an
<i> tag which has the text "Price". So, we can find that <i>
tag, get the text of it's parent, find the last word of that text,
and that will be the price of the ticket as a string that looks like
"$10.", which we can then strip of the non-digits.
"""
soup = bs(html, "lxml")
price_element = soup.find(string="Price")
price_text = price_element.parent.parent.text.split(" ")[-1]
price = int(re.sub(r"\D", "", price_text))
return price
def calculate_original_ev(game_url):
"""
The "expected value" or "return on investment" of a game
will be the total value of the remaining prizes
divided by the total cost of the remaining tickets.
Imagine you bought every ticket that was printed.
How much money would you spend? How much money would you get back in prizes?
If you won $1,500,000 and spent $2,000,000
then your expected value is 1,500,000 / 2,000,000 = 0.75.
For every $1 spent on the game, you'll get back $0.75
for an average loss of $0.25.
"""
game_html = fetch_html(game_url)
game_rules_url = find_complete_game_rules_url(game_html)
game_rules_html = fetch_html(game_rules_url)
price = find_price(game_rules_html)
rows = find_rows(game_rules_html)
total_number_tickets = sum(r[1] for r in rows)
total_value_tickets = sum(r[1] * r[0] for r in rows)
total_cost_tickets = total_number_tickets * price
ev = total_value_tickets / total_cost_tickets
return ev
def combine_prizes(prizes):
combined = []
last_prize = prizes[0]
for prize in prizes[1:]:
if last_prize[-1] == prize[-1]:
last_prize[0] += prize[0]
else:
combined.append(last_prize)
last_prize = prize
combined.append(last_prize)
return combined
def parse_game_html(name, url, html):
game = {}
game_soup = bs(html, "lxml")
game["name"] = name.strip()
game["url"] = url
game["game_id"] = re.match(r".*?(\d+$)", url).group(1)
game_rules_url = find_complete_game_rules_url(html)
game_rules_html = fetch_html(game_rules_url)
game_rules_soup = bs(game_rules_html, "lxml")
game["price"] = find_price(game_rules_html)
prize_table = game_rules_soup.find("table", class_="miscr")
def prize_value(p, price):
p = p.text.strip()
if re.search(r"FREE", p):
return price
else:
return p.replace("$", "").replace(",", "")
prize_tuples = [
[
int(tds[-1].text.replace(",", "").strip()),
float(tds[-2].text.replace(",", "").strip()),
float(prize_value(tds[-3], game["price"])),
# float(tds[-3].text.replace("$", "").replace(",", "").strip()),
]
for tds in [tr.find_all("td") for tr in prize_table.find_all("tr")[1:]]
]
game["num_tx_initial"] = prize_tuples[-1][0] * prize_tuples[-1][1]
game["state"] = "pa"
combined_prizes = sorted(combine_prizes(deepcopy(prize_tuples)), key=lambda x: x[2])
prizes_remaining_table = game_soup.find("table", class_="table-global").find(
"tbody"
)
prizes_remaining = [
[
int(tds[1].text.strip()),
float(tds[0].text.replace("$", "").replace(",", "").strip()),
]
for tds in [tr.find_all("td") for tr in prizes_remaining_table.find_all("tr")]
]
percent_tx_remain = sum(p[0] for p in prizes_remaining) / sum(
p[0] for p in combined_prizes[: -len(prizes_remaining) - 1 : -1]
)
combined_prizes = sorted(
[[p[0], p[2]] for p in combined_prizes], key=lambda x: -x[1]
)
prizes = sorted(deepcopy(combined_prizes), key=lambda x: -x[1])
prizes[: len(prizes_remaining)] = prizes_remaining
for prize in prizes[len(prizes_remaining) :]:
prize[0] = int(prize[0] * percent_tx_remain)
game_prizes = []
for p, orig in zip(prizes, combined_prizes):
prize = {}
prize["available"] = p[0]
prize["claimed"] = orig[0] - p[0]
prize["value"] = p[1]
prize["prize"] = locale.currency(p[1], grouping=True)
game_prizes.append(prize)
game["prizes"] = game_prizes
return game
def main():
index_html = fetch_html(INDEX_URL)
game_urls = find_game_urls(index_html)
game_names = find_game_names(index_html)
# Data will be a list of tuples that looks like:
# [(Ticket Price, Game Name, Expected Value), ...]
#
# The first element of the tuple of the list comprehension below
# is kind of confusing. We are iterating over game urls.
# We first fetch the html for the game url. Then we find the
# game rules url in that page. Then we fetch the html of the game rules
# page, then we find the price from that html.
# Hence:
# `find_price(fetch_html(find_complete_game_rules_url(fetch_html(url))))`
games = []
for name, url in list(zip(game_names, game_urls)):
try:
game_html = fetch_html(url)
except Exception as e:
logger.error("Error fetching %s: %s", url, e)
continue
try:
games.append(parse_game_html(name, url, game_html))
except Exception as e:
t, b, tb = sys.exc_info()
tb_msg = "\n".join(traceback.format_tb(tb))
logger.error("Unable to parse game {}.\n{}\n{}".format(name, e, tb_msg))
return games
if __name__ == "__main__":
games = main()
schema = GameSchema(many=True)
print(schema.dumps(games))

@ -0,0 +1,76 @@
"""Some marshmallow schemas to do data validation and serialization.
How to use:
Create your model as a plain old Python object.
Example:
game = {}
game["game_id"] = "5"
game["price"] = 30
game["state"] = "tx"
Then create an instance of the schema.
schema = GameSchema()
Call `schema.dumps(game)` to "dump" your Python object to a string in JSON
format.
>>> game = {"game_id": "5", "price": 30, "state": "tx", "created_at": datetime.utcnow()}
>>> schema = GameSchema()
>>> schema.dumps(game)
'{"game_id": "5", "state": "tx", "created_at": "2023-04-08T05:58:49.494561", "price": 30.0, "image_urls": "[]"}'
And you can load a JSON string into a Python object with `schema.loads`.
>>> schema.loads(schema.dumps(game))
{'game_id': '5', 'state': 'tx', 'created_at': datetime.datetime(2023, 4, 8, 5, 58, 49, 494561), 'price': 30.0, 'image_urls': []}
Some fields, like `game_id`, are required. You can validate a Python object by calling `schema.validate`.
>>> game = {"price": 30, "state": "tx", "created_at": datetime.utcnow()}
>>> schema.dumps(game)
'{"state": "tx", "created_at": "2023-04-08T06:02:32.126541", "price": 30.0, "image_urls": "[]"}'
>>> schema.validate(game)
{'created_at': ['Not a valid datetime.']}
"""
from datetime import datetime
import json
from marshmallow import Schema, fields
class PrizeSchema(Schema):
class Meta:
render_module = json
id = fields.Integer()
game_id = fields.Integer()
available = fields.Integer()
claimed = fields.Integer()
created_at = fields.DateTime(load_default=datetime.utcnow)
value = fields.Number()
prize = fields.Str()
class GameSchema(Schema):
class Meta:
render_module = json
id = fields.Integer()
created_at = fields.DateTime(load_default=datetime.utcnow)
game_id = fields.Str(required=True)
name = fields.Str()
description = fields.Str()
image_urls = fields.Function(
lambda x: json.dumps(x.get("image_urls", [])),
deserialize=lambda x: json.loads(x),
)
how_to_play = fields.Str()
num_tx_initial = fields.Integer()
price = fields.Number()
prizes = fields.Nested(PrizeSchema, many=True)
state = fields.Str()
updated_at = fields.DateTime()
url = fields.Str()

@ -0,0 +1,28 @@
from setuptools import setup, find_packages
setup(
name="lottery_data_scraper",
version="0.0.1",
author="Eric Ihli",
author_email="eihli@owoga.com",
url="https://github.com/owogac/lottery_data_scraper",
packages=find_packages(),
install_requires=[
"beautifulsoup4",
"requests==2.28.2",
"urllib3==1.26.15",
"numpy",
"pandas",
"lxml",
"html2text",
"html5lib",
"marshmallow==3.19.0",
"selenium==3.141.0",
"pybind11",
# If you want to develop locally and don't want to mess around with
# Xvfb (https://en.wikipedia.org/wiki/Xvfb), then just comment out
# the next line before you run `python3 setup.py install`.
"xvfbwrapper==0.2.9",
"table_ocr==0.2.5",
],
)

@ -0,0 +1,23 @@
import unittest
import requests
from lottery_data_scraper import pennsylvania
from lottery_data_scraper import schemas
class TestPennsylvania(unittest.TestCase):
def test_parse_game_html(self):
# URL chosen arbitrarily
url = "https://www.palottery.state.pa.us/Scratch-Offs/View-Scratch-Off.aspx?id=3201"
html = pennsylvania.fetch_html(url)
game = pennsylvania.parse_game_html("$3 Million Mega Stacks", url, html)
self.assertEqual(game["name"], "$3 Million Mega Stacks")
self.assertEqual(game["price"], 30)
self.assertEqual(
game["url"],
"https://www.palottery.state.pa.us/Scratch-Offs/View-Scratch-Off.aspx?id=3201",
)
self.assertEqual(game["game_id"], "3201")
self.assertEqual(game["prizes"][0]["prize"], "$3,000,000.00")
# Perhaps unfortunately in dollars. Cents would be better, eh?
self.assertEqual(game["prizes"][0]["value"], 3000000)
Loading…
Cancel
Save