Added sqlite export of .MCA file's record spec. This won't live in /data, but in a user's cache. This is to allow user choice on how and when to update the timetable files and reduce redundancy.

This commit is contained in:
2026-05-22 11:59:45 +01:00
parent fc09eb775e
commit d63f151c9b
2 changed files with 73 additions and 5 deletions
+3 -5
View File
@@ -3,10 +3,8 @@ from national_rail_timetable.parsing import (
extract_specification_document_tables,
store_specification_table_raws,
read_specification_table_raws,
create_mca_specification_dbschema,
main,
)
# print(fetch_nr_token())
# print(fetch_nr_timetable_files())
tables = extract_specification_document_tables()
print(store_specification_table_raws(tables))
print(read_specification_table_raws())
print(main())
+70
View File
@@ -6,12 +6,15 @@ Aimed primarily towards producing a reduced sqlite database.
# pyright: reportUnknownVariableType=false
# pyright: reportUnknownArgumentType=false
# pyright: reportUnknownMemberType=false
# pyright: reportUnknownLambdaType=false
# Imports
from itertools import pairwise
from pathlib import Path
import pandas as pd
import numpy as np
import sqlite3
import os
from pypdf import PageObject, PdfReader
@@ -144,3 +147,70 @@ def read_specification_table_raws(
continue
tables[path.name[:-4]] = pd.read_csv(path)
return tables
def create_mca_specification_dbschema(
tables: dict[str, pd.DataFrame],
db_path: Path | None = None,
):
db_path = (
db_path
if db_path is not None
else Path(os.environ.get("NR_DATADIR", "~/.cache/nr_data/timetable.db"))
)
db_path.parent.mkdir(exist_ok=True, parents=True)
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
for name, df in tables.items():
if (_n := name.split("_"))[0] != "MCA" or len(_n) != 2:
continue
df["Start Index"] = df["Position"].apply(lambda s: int(s.split("-")[0]) - 1)
df["End Index"] = df["Position"].apply(lambda s: int(s.split("-")[-1]))
_ = cursor.execute(f"DROP TABLE IF EXISTS spec_{name.lower()}")
_ = cursor.execute(
f"""
CREATE TABLE spec_{name.lower()}
({", ".join([col.lower().replace(" ", "_") for col in df.columns])})
""",
)
_ = cursor.executemany(
f"""
INSERT INTO spec_{name.lower()}
VALUES({", ".join(["?" for _ in df.columns])})
""",
[list(row.values) for _, row in df.iterrows()],
)
connection.commit()
connection.close()
return db_path
# Script
def main(
skip_pdf: bool = False,
pdf_spec_path: Path | None = None,
raw_spec_dir: Path | None = None,
):
if not skip_pdf:
try:
tables = extract_specification_document_tables(pdf_spec_path)
_ = store_specification_table_raws(tables, raw_spec_dir)
except FileNotFoundError:
pass
try:
tables = read_specification_table_raws(raw_spec_dir)
except FileNotFoundError:
raise FileNotFoundError(
"The tables generated from the RSP's specification were not found. "
+ "This means neither the cached version nor the original .pdf is available. "
+ "Try suppling either to their default locations, or supplying custom directories. "
+ "Manual fix: extract_specification_document_tables then store_specification_table_raws. "
)
_ = create_mca_specification_dbschema(tables)
if __name__ == "__main__":
main()