traewelling-to-travelynx/importer.py

139 lines
4.6 KiB
Python

#!/usr/bin/env python3
import json
import logging
from datetime import datetime, timezone
from sys import exit
from requests import Session
TRAEWELLING_API_TOKEN = ""
TRAEWELLING_USER = ""
TRAVELYNX_API_TOKEN = ""
IMPORT_UNTIL = "2021-06-28"
# -------------------------------------------------------------------
logging.basicConfig(level=logging.INFO)
l = logging.getLogger(__name__)
s = Session()
s.headers = {
"Accept": "application/json",
}
# get all checkins from traewelling
url = f"https://traewelling.de/api/v1/user/{TRAEWELLING_USER}/statuses"
checkins = []
while url is not None:
try:
l.info(f"requesting statuses from {url}")
r = s.get(
url,
headers={
"X-CRSF-TOKEN": "",
"Authorization": f"Bearer {TRAEWELLING_API_TOKEN}",
},
)
r.raise_for_status()
result_json = r.json()
checkins.extend(result_json.get("data", []))
url = result_json.get("links", {}).get("next")
except Exception:
l.exception(f"error while getting data from {url}")
if not checkins:
# only exit if we have had no checkins at all
exit(1)
try:
with open("state.json") as f:
processed_checkins = json.load(f)["checkins"]
except FileNotFoundError:
processed_checkins = []
time_limit = datetime.strptime(IMPORT_UNTIL, "%Y-%m-%d").astimezone(timezone.utc)
for checkin in sorted(checkins, key=lambda c: c["createdAt"]):
try:
l.info(f"processing checkin {checkin['id']} created at {checkin['createdAt']}")
created = datetime.fromisoformat(checkin["createdAt"])
if created >= time_limit:
l.warning("SKIP: newer than IMPORT_UNTIL")
continue
if checkin["id"] in processed_checkins:
l.warning("SKIP: already processed")
continue
train_type = checkin["train"]["lineName"].split(" ")[0]
comment = [f"Import from Traewelling, ID {checkin['id']}"]
if checkin["body"]:
comment.append(checkin["body"])
if checkin["event"]:
comment.extend([checkin["event"]["name"], checkin["event"]["url"]])
travelynx_data = {
"token": TRAVELYNX_API_TOKEN,
"dryRun": False,
"lax": True,
"cancelled": False,
"train": {
"type": train_type,
"line": None,
"no": checkin["train"]["number"],
},
"fromStation": {
"name": checkin["train"]["origin"]["rilIdentifier"]
or checkin["train"]["origin"]["name"],
"scheduledTime": int(
datetime.fromisoformat(
checkin["train"]["origin"]["departurePlanned"]
or checkin["train"]["origin"]["departure"]
).timestamp()
),
"realTime": int(
datetime.fromisoformat(
checkin["train"]["origin"]["departureReal"]
).timestamp()
)
if checkin["train"]["origin"]["departureReal"]
else None,
},
"toStation": {
"name": checkin["train"]["destination"]["rilIdentifier"]
or checkin["train"]["destination"]["name"],
"scheduledTime": int(
datetime.fromisoformat(
checkin["train"]["destination"]["departurePlanned"]
or checkin["train"]["destination"]["departure"]
).timestamp()
),
"realTime": int(
datetime.fromisoformat(
checkin["train"]["destination"]["departureReal"]
).timestamp()
)
if checkin["train"]["destination"]["departureReal"]
else None,
},
# "intermediateStops": [],
"comment": " - ".join(comment),
}
l.debug(json.dumps(travelynx_data))
r = s.post(
"https://travelynx.franzi.business/api/v1/import", json=travelynx_data
)
l.debug(r.text)
r.raise_for_status()
result = r.json()
if result["success"]:
l.info("added to travelynx")
processed_checkins.append(checkin["id"])
else:
l.error(f"failed with error: {result['error']}")
l.error(travelynx_data)
except Exception:
l.exception("error occured!")
break
print(processed_checkins)
with open("state.json", "w") as f:
json.dump({"checkins": processed_checkins}, f)