Flow Ingest

The goal of this pipeline is to update a parquet file with flow data from the French grid.

This is the ingest step:

The flow data is available from 2014-12-16, onwards. However, given that the generation data are available starting 2017-01-01, we’ll use that as a start date.

import os
import dvc.api
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import json
import functools, itertools
import polars as pl
from pyprojroot.here import here

Let’s determine the start_date and the end_date for the request:

Dates are expressed as midnight, Paris time.

The timezone is a parameter that we share among multiple files in each of the pipelines. so we read a parameter:

params = dvc.api.params_show("dvc-params.yaml")

The timezone is a parameter that we share among multiple files in each of the pipelines. so we read a parameter:

To get the pipeline started, we read the published metadata, which contains the most-recent valid date. Note that although we are reading from this file, we not declare it a dependency in dvc.yaml. This is because to declare this file would be to create a circular dependency, which DVC would refuse to accept.

tz_local = ZoneInfo(params["tz_local"])

# not including this as a DVC dependency, as this would make things "circular"
file_transform_meta = here("data/99-publish/flow-meta.json")

# if file exists, get most-recent date
date_end_previous = None
if os.path.isfile(file_transform_meta):
    with open(file_transform_meta, "r") as file:
        meta = json.load(file)
        date_end_previous = datetime.fromisoformat(meta.get("interval_end")).astimezone(
            tz_local
        )

Let’s also get the current date (start of today, Paris time):

date_today = datetime.now(tz_local).replace(hour=0, minute=0, second=0, microsecond=0)

We now have enough information to determine the start and end dates for the API call:

# timedelta() seems to use periods rather than intervals;
# i.e., it takes DST into account and returns same wall-clock time.

date_start = datetime(2017, 1, 1, tzinfo=tz_local)
if date_end_previous is not None:
    date_start = date_end_previous

date_end = date_start + timedelta(days=14)

# amend date_end if in the future
date_end = min(date_end, date_today)

Given the context, here are the start and end dates:

print(date_start.isoformat())
print(date_end.isoformat())
2017-06-27T16:00:00+02:00
2017-07-11T16:00:00+02:00

We need to request a token:

auth = requests.post(
    "https://digital.iservices.rte-france.com/token/oauth/",
    headers={
        "Authorization": f'Basic {os.environ["RTE_FRANCE_BASE64"]}',
        "Content-Type": "application/x-www-form-urlencoded",
    },
)
token = auth.json()["access_token"]

We compose a request, gather the response, then pull out the data:

endpoint = "https://digital.iservices.rte-france.com/open_api/physical_flow/v1/physical_flows"

response = requests.get(
    f"{endpoint}/?start_date={date_start.isoformat()}&end_date={date_end.isoformat()}",
    headers={
        "Host": "digital.iservices.rte-france.com",
        "Authorization": f"Bearer {token}",
    },
)

Check the response:

response.ok
True

If the response is OK, write out JSON content to file:

if response.ok:
    array = response.json()["physical_flows"]

    with open(here("data/01-ingest/flow.json"), "w") as file:
        json.dump(array, file)