diff --git a/pyproject.toml b/pyproject.toml index d332f6a..2f84cc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,10 @@ authors = [ ] requires-python = ">=3.14" dependencies = [ - "requests (>=2.34.2,<3.0.0)" + "requests (>=2.34.2,<3.0.0)", + "pypdf (>=6.12.0,<7.0.0)", + "pandas (>=3.0.3,<4.0.0)", + "pandas-stubs (>=3.0.0.260204,<4.0.0.0)" ] diff --git a/src/national_rail_timetable/__main__.py b/src/national_rail_timetable/__main__.py index 765508a..c332b91 100644 --- a/src/national_rail_timetable/__main__.py +++ b/src/national_rail_timetable/__main__.py @@ -1,4 +1,6 @@ from national_rail_timetable.nr_requests import fetch_nr_token, fetch_nr_timetable_files +from national_rail_timetable.parsing import extract_specification_document_tables -print(fetch_nr_token()) -print(fetch_nr_timetable_files()) +# print(fetch_nr_token()) +# print(fetch_nr_timetable_files()) +print(extract_specification_document_tables()) diff --git a/src/national_rail_timetable/parsing.py b/src/national_rail_timetable/parsing.py new file mode 100644 index 0000000..8e7f069 --- /dev/null +++ b/src/national_rail_timetable/parsing.py @@ -0,0 +1,116 @@ +""" +Parsing tools for the ZipFile returned by nr_requests. +Aimed primarily towards producing a reduced sqlite database. +""" +# pyright: reportAny=false +# pyright: reportUnknownVariableType=false +# pyright: reportUnknownArgumentType=false +# pyright: reportUnknownMemberType=false + +# Imports +from itertools import pairwise +from pathlib import Path +import pandas as pd +import numpy as np +from pypdf import PageObject, PdfReader + + +# Init. +SPECIFICATION_TABLE_LOCATIONS = { + "MCA_HD": (15, 0), + "MCA_BS": (16, 0), + "MCA_BX": (17, 0), + "MCA_LO": (17, 1), + "MCA_LI": (18, 0), + "MCA_CR": (18, 1), + "MCA_LT": (19, 0), + "MCA_AA": (21, 0), + "MCA_TI": (22, 0), + "MCA_TA": (22, 1), + "MCA_TD": (23, 0), + "MCA_ZZ": (23, 1), +} + + +# Functions +# TODO: Implement better header check and row breaks to ingest .ALF and others. +def extract_specification_document_tables( + path: Path | None = None, # pyright: ignore[reportRedeclaration] +) -> dict[str, pd.DataFrame]: + path: Path = ( + path + if path is not None + else Path(__file__).parents[2] / "data/Timetable Specification.pdf" + ) + if not path.exists() or path.is_dir(): + raise FileNotFoundError( + "The path given does not exist or is a folder. " + + "Please supply the path to the RSP's Timetable Specification document. " + + "Or move it to the path expected. " + + "The file can be found at https://www.rspaccreditation.org/publicDocumentation.php#RSPS5xxx. " + + "Please select RSPS5046 pertaining to timetable data. " + + f'Path tried: "{path}"' + ) + reader = PdfReader(path) + + tables: dict[str, pd.DataFrame] = {} + for key, (page_number, table_number) in SPECIFICATION_TABLE_LOCATIONS.items(): + page: PageObject = reader.pages[page_number] + + text = page.extract_text(extraction_mode="layout") + text = text.split("Field description")[table_number + 1] + text = text[len(text.split("\n")[0]) + 1 :] + text = text.split("\n\n")[0] + + lines = text.split("\n") + max_line_length = max([len(line) for line in lines]) + characters = np.array( + [ + [ + character + for character in (line + " " * max_line_length)[:max_line_length] + ] + for line in lines + ], + dtype="U1", + ) + + column_breaks = np.sum(np.equal(characters, " "), axis=0) == characters.shape[0] + column_breaks = column_breaks[2:] & column_breaks[1:-1] & column_breaks[:-2] + column_breaks = [True, *column_breaks, False, False] + column_breaks = np.diff(np.array(column_breaks, dtype=np.int8)) == 1 + + row_breaks = ( + characters[:, np.arange(characters.shape[1])[column_breaks][0] - 1] != " " + ) + + records: list[list[str]] = [] + for rl, ru in pairwise([*np.arange(characters.shape[0])[row_breaks], None]): + records.append([]) + for cl, cu in pairwise( + [0, *np.arange(characters.shape[1])[column_breaks], None] + ): + records[-1].append( + " ".join( + "".join( + [ + character + for character in np.ravel(characters[rl:ru, cl:cu]) + ] + ) + .strip() + .split() + ) + ) + tables[key] = pd.DataFrame( + records, + columns=[ + "Field", + "Field Description", + "Length", + "Position", + "Notes", + ], + ) + + return tables