import os
import dvc.api
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import json
import functools, itertools
import polars as pl
from pyprojroot.here import hereGeneration Ingest
The goal of this pipeline is to update a parquet file with generation data from the French grid.
This is the ingest step:
- Try to read a data file to get start date, use default otherwise.
- Use minimum of today’s date or start-date plus two weeks as end date.
- Assemble query, execute for API
The generation data is available from 2017-01-01, onwards.
Let’s determine the start_date and the end_date for the request:
the start date will be the larger of:
- 2017-01-01
- the end of the most-recent interval in the dataset
the end date will be the smaller of:
- the start date plus 14 days
- the current date
Dates are expressed as midnight, Paris time.
The timezone is a parameter that we share among multiple files in each of the pipelines. so we read a parameter:
params = dvc.api.params_show("dvc-params.yaml")To get the pipeline started, we read the published metadata, which contains the most-recent valid date. Note that although we are reading from this file, we not declare it a dependency in dvc.yaml. This is because to declare this file would be to create a circular dependency, which DVC would refuse to accept.
tz_local = ZoneInfo(params["tz_local"])
# not including this as a DVC dependency, as this would make things "circular"
file_transform_meta = here("data/99-publish/generation-meta.json")
# if file exists, get most-recent date
date_end_previous = None
if os.path.isfile(file_transform_meta):
with open(file_transform_meta, "r") as file:
meta = json.load(file)
date_end_previous = datetime.fromisoformat(meta.get("interval_end")).astimezone(
tz_local
)Let’s also get the current date (start of today, Paris time):
date_today = datetime.now(tz_local).replace(hour=0, minute=0, second=0, microsecond=0)We now have enough information to determine the start and end dates for the API call:
# timedelta() seems to use periods rather than intervals;
# i.e., it takes DST into account and returns same wall-clock time.
date_start = datetime(2017, 1, 1, tzinfo=tz_local)
if date_end_previous is not None:
date_start = date_end_previous
date_end = date_start + timedelta(days=14)
# amend date_end if in the future
date_end = min(date_end, date_today)Given the context, here are the start and end dates:
print(date_start.isoformat())
print(date_end.isoformat())2023-07-27T18:30:00+02:00
2023-08-10T18:30:00+02:00
We need to request a token:
auth = requests.post(
"https://digital.iservices.rte-france.com/token/oauth/",
headers={
"Authorization": f'Basic {os.environ["RTE_FRANCE_BASE64"]}',
"Content-Type": "application/x-www-form-urlencoded",
},
)
token = auth.json()["access_token"]We compose a request, gather the response, then pull out the data:
endpoint = "https://digital.iservices.rte-france.com/open_api/actual_generation/v1/generation_mix_15min_time_scale"
response = requests.get(
f"{endpoint}/?start_date={date_start.isoformat()}&end_date={date_end.isoformat()}",
headers={
"Host": "digital.iservices.rte-france.com",
"Authorization": f"Bearer {token}",
},
)Check the response:
response.okTrue
If the response is OK, write out JSON content to file:
if response.ok:
array = response.json()["generation_mix_15min_time_scale"]
with open(here("data/01-ingest/generation.json"), "w") as file:
json.dump(array, file)