Data loader part of the mid-project for the Data Science bootcamp from Core Code School
Project description
Data Loader
Este proyecto contiene las herramientas necesarias para:
- Crear la base de datos
- Cargar datos en esa base de datos
- Consultar esos datos
Instalación
Biblioteca
Para usarlo como biblioteca basta con instalarlo con pip
pip install covid-data
CLI
Lamentablmente, por el momento el CLI no funciona si se instala con pip
. Para poder usar el CLI es necesario hacer clone
del repositorio. Una ve lo tienes, instalar las dependencias:
pip install -r requirements.txt
Si usas Poetry puedes instalar las dependencias con
poetry install
Es necesario instalar el propio paquete para que los imports funcionen:
pip install -e .
Una vez instaladas las dependencias, puedes usar el CLI
cd covid_data
./covid_data --help
Opciones de configuración
Este proyecto se vale de las siguientes variables de entorno para su configuración:
POSTGRES_USER: Usuario de Postgree
POSTGRES_PASS: Contraseña de Postgres
POSTGRES_HOST: Host donde se encuentra Postgres
POSTGRES_PORT: Puerto para acceder a Postgres
POSTGRES_DB: Base de datos a la que conectarse
CAGEDATA_API_KEY: Clave api de OpenCageData para enriquecer datos con localizaciones
Ahora voy a detallar las formas de usar el proyecto
Como CLI
Para cargar los datos se puede usar como CLI. Lo primero será crear la base de datos con el esquema disponible aquí. La estructura de la base de datos es la siguiente:
Una vez está creada la base de datos, se pueden cargar los datos de los archivos CSV con el siguiente comando:
./covid_data loadcsv /path/to/confirmed.csv,/path/to/dead.csv,/path/to/recovered.csv -tf -o
Para obtener ayuda sobre cómo usar el comando, ejecuta:
❯ ./covid_data --help
Usage: app.py [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
loadcsv Loads FILES as CSV data.
scrap Scrap cases of chosen COUNTRY.
Hay 2 comandos disponibles
loadcsv
❯ ./covid_data loadcsv --help
Usage: app.py loadcsv [OPTIONS] FILES
Loads FILES as CSV data. If you want to load several files, each file should
be separated by comma
Options:
-t, --type TEXT Type of cases contained in each file, separated by
comma. Leave blank if using --type-in-file
-tf, --type-in-file Set this to true if the file names are <case_type>.csv
Being <case_type> one of confirmed, recovered or dead
-o, --optimize Set to true to skip lines for places that has more
cases than columns on the CSV
--help Show this message and exit.
Este comando te permite procesar y cargar en la base de datos uno o más archivos CSV. Cada archivo debe contener información de un solo tipo de casos.
Las rutas a los archivos deben ir separados por comas.
De igual manera, el tipo de caso de cada archivo debe ir separados por comas en el argumento --type
y en el mismo orden que los archivos.
Otra opción es no usar el argumento --type
y nombrar los archivos con el tipo de caso que contienten, e.g. recovered.csv
. Si se toma esta alternativa, se debe usar el parámetro --type-in-file
.
Por último, el argumento --optimize
se saltará todas las filas en las que el país al que hacen referencia ya tenga el mismo (o más) número de casos del tipo del archivo, que columnas tiene el CSV.
Un ejemplo de uso de este comando para cargar los CSVs con todos los datos es:
./covid_data loadcsv /path/to/confirmed.csv,/path/to/dead.csv,/path/to/recovered.csv -tf -o
Usa el nombre del archivo para nombrar el tipo de caso y la optimización.
scrap
Este comando permite hacer scrapping de un país concreto. Se usa para ampliar datos por país.
El uso de este comando se detalla aquí:
Usage: app.py scrap [OPTIONS] COUNTRY
Scrap cases of chosen COUNTRY. To check available countries to scrap use
--check
Options:
--check Use this to check available countries
--start-date TEXT Date to start scraping cases from, in format DD/MM/YYYY
--help Show this message and exit.
Hay un argumento que es obligatorio y es el país a scrappear. Si se quiere saber qué países hay disponibles, basta con poner cualquier valor en COUNTRY
y pasar el parámetro --check
:
❯ ./covid_data scrap XXX --check
Available countries are:
France
Spain
Y, una vez se sabe qué país se va a usar, se lanza el scrapper:
❯ ./covid_data scrap spain
Fetching cases for province 1/20
Fetching cases for province 2/20
Fetching cases for province 3/20
...
Se puede especificar la fecha de inicio usando el argumento --start-date
. La fecha debe estar en formato DD/MM/YYYY
./covid_data scrap France --start-date 01/08/2021
Añadir nuevos scrappers
Añadir un nuevo scrapper para un país es tan sencillo como colocar un archivo <country_name>.py
en la carpeta scrappers
y que ese archivo exponga una función:
def scrap(db_engine: psycopg2._psycopg.connection, start_date: datetime.datetime)
Como biblioteca
La parte más interesante de este paquete como biblioteca es el sumbódulo db
, que se detalla a continuación.
Glossary
connection: psycopg2._psycopg.connection
CaseType: covid_data.types.CaseType
Aggregations: covid_data.types.Aggregations
CaseType: covid_data.types.CaseType
OnConflictStrategy: covid_data.types.OnConflictStrategy
OrderBy: covid_data.types.OrderBy
PlaceProperty: covid_data.types.PlaceProperty
PlaceTable: covid_data.types.PlaceTable
PlaceType: covid_data.types.PlaceType
covid_data.db
def get_db(
user: str = None,
passwd: str = None,
host: str = None,
port: str = None,
db: str = None,
) -> connection:
Devuelve una conexión a la base de datos. Esta conexión debe ser cerrada manualmente al terminar. La configuración de la conexión se puede pasar directamente al método (si se usa como biblioteca) o por variables de entorno (si se usa como CLI). Las variables de entorno son las siguientes:
POSTGRES_USER
POSTGRES_PASS
POSTGRES_HOST
POSTGRES_PORT
POSTGRES_DB
def close_db(conn: connection) -> Callable
Recibe una conexión y devuelve una lambda que cierra la conexión al ser llamada. Esto se hace así para poder pasarle esta función como callback al ExitStack
y poder cerrarla automáticamente al finalizar una función.
covid_data.db.queries
def place_exists(
place: str, engine: connection, table: PlaceTable = PlaceTable.COUNTRY
) -> Union[str, None]:
"""
Given a place name and a table, returns its ID or none if not exits
"""
def get_country_by_alpha2(country: str, engine: connection) -> Optional[dict]:
"""
Given an alpha2 code, return a country
"""
def get_country_by_alpha3(country: str, engine: connection) -> Optional[dict]:
"""
Given an alpha3 code, return a country
"""
def get_province_by_alpha2(province: str, engine: connection) -> Optional[dict]:
"""
Given an alpha2 code, return a province
"""
def get_county_by_alpha2(county: str, engine: connection) -> Optional[dict]:
"""
Given an alpha2 code, return a county
"""
def get_country_by_id(country_id: str, engine: connection) -> Optional[dict]:
"""
Given an id, return a country
"""
def get_province_by_id(province_id: str, engine: connection) -> Optional[dict]:
"""
Given an id, return a province
"""
def get_province_by_name(province: str, engine: connection) -> Optional[dict]:
"""
Given a name, return a country
"""
def get_county_by_id(county_id: str, engine: connection) -> Optional[dict]:
"""
Given an id, return a county
"""
def row_to_dict(
rows: Iterable, table_or_cols: Union[str, Iterable[str]], engine: connection
) -> list[dict]:
"""
Given an array of rows and a table or a column lists, transform the rows to an array of dicts, with columns as keys
"""
def ensure_array(element) -> List:
"""
Given an object, ensure it's an array
"""
def create_country(country: dict, engine: connection) -> str:
"""
Given a dict with all country data, insert it in the database
"""
def create_province(province: dict, engine: connection) -> str:
"""
Given a dict with all province data, insert it in the database
"""
def create_county(county: dict, engine: connection) -> str:
"""
Given a dict with all county data, insert it in the database
"""
def get_cases_by_country(
country_id: int, engine: connection, case_type: CaseType = None
) -> List[Dict]:
"""
Given a dict with all country data, insert it in the database
"""
def get_cases_by_province(
province_id: int, engine: connection, case_type: CaseType = None
) -> List[Dict]:
"""
Given a province id, return all the cases from that provicen, filtered by type if provided
"""
def get_cases_by_filters_query(
country_id: int = None,
province_id: int = None,
date: datetime = None,
date_lte: datetime = None,
date_gte: datetime = None,
case_type: CaseType = None,
aggregation: list[Aggregations] = [],
limit: int = None,
sort: list[str] = [],
) -> Dict[str, Any]:
"""
Returns a query that matches all the filters passed. The returned query is splitted so that the caller can manipulate it before executing. The return dict is in the form:
{
"select": select,
"from": from_,
"query": query,
"params": tuple(params),
"columns": tuple(columns),
}
"""
def get_cum_cases_by_date(
engine: connection,
date: datetime = None,
date_lte: datetime = None,
date_gte: datetime = None,
case_type: CaseType = None,
) -> List[Dict]:
"""
Returns the cummulative sum of cases aggregated by date. Filtered by date and type if provided
"""
def get_cum_cases_by_date_country(
engine: connection,
country_id: int,
date: datetime = None,
date_lte: datetime = None,
date_gte: datetime = None,
case_type: CaseType = None,
) -> List[Dict]:
"""
Returns the cummulative sum of cases aggregated by date and contry. Filtered by date and type if provided
"""
def get_cum_cases_by_country(
engine: connection,
date: datetime = None,
date_lte: datetime = None,
date_gte: datetime = None,
case_type: CaseType = None,
) -> List[Dict]:
"""
Returns the cummulative sum of cases aggregated by country. Filtered by date and type if provided
"""
def get_cum_cases_by_province(
engine: connection,
date: datetime = None,
date_lte: datetime = None,
date_gte: datetime = None,
case_type: CaseType = None,
country_id: int = None,
) -> List[Dict]:
"""
Returns the cummulative sum of cases aggregated by province. Filtered by date and type if provided
"""
def create_case(
case: dict,
engine: connection,
conflict_strategy: OnConflictStrategy = OnConflictStrategy.REPLACE,
) -> bool:
"""
Given all the data regarding a case, create it. Also takes a replace strategy that can be set to replace or upsert.
"""
def get_all_countries(
engine: connection, name: str = None, near: list[float] = []
) -> list[dict]:
"""
Get all countries filtered by name if provided and ordered by distance to `near`
"""
def get_all_provinces(engine: connection) -> list[dict]:
"""
Get all provinces
"""
def get_provinces_by_country(engine: connection, country_id: int) -> list[dict]:
"""
Get all provinces that belongs to a specific country
"""
def insert_api_key(engine: connection, hashed_key: str) -> bool:
"""
Creates an api key
"""
def check_api_key(engine: connection, hashed_key: str) -> bool:
"""
Check if the incoming key exists
"""
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for covid_data-0.1.19-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 58bd9cf218188b889c7ca8c4cd0bb2a73386f55eb839cbaad11d516ae2994e7a |
|
MD5 | a826d2a86666901737943177d5e56fcd |
|
BLAKE2b-256 | ebb35e0544a23cae9aed094905259e90c3babe11cb126b0458bdb9e77dc5e50f |