Modificato in una REST API con Flask

This commit is contained in:
Fabio Scotto di Santolo
2021-06-20 15:08:59 +02:00
parent 2e1b532e5c
commit cf9ab44d5d
7 changed files with 275 additions and 33 deletions

76
app.py Normal file
View File

@@ -0,0 +1,76 @@
import itertools
from datetime import date
from typing import List, Dict
from dateutil import relativedelta as period
from flask import Flask
from db.access import fetch_european_departments, \
fetch_department_employees, \
fetch_american_departments, \
fetch_employees, \
fetch_departments, \
fetch_canadian_departments
from models.department import Department
app = Flask(__name__)
@app.get("/")
def index():
return "<h1>Homepage HR Resources</h1>"
@app.errorhandler(404)
@app.errorhandler(500)
def handle_error(error):
return {
"status": error.code,
"description": error.description,
"name": error.name
}
@app.get("/employee")
def employees():
return {"employees": [e.to_json() for e in fetch_employees()]}
def __employees_group_by_seniority(departments: List[Department]) -> Dict[str, dict]:
result = {}
for department in departments:
# Per ogni dipartimento ricerco i suoi impiegati
employees = fetch_department_employees(department.department_id)
# Raggruppo gli impiegati per anni di anzianità
employee_group_by_years = itertools.groupby(employees,
lambda e: period.relativedelta(date.today(), e.hire_date).years)
result[department.name] = {year: [e.to_json() for e in employees]
for (year, employees) in employee_group_by_years}
return result
@app.get("/employee/all-by-seniority")
def employees_by_seniority():
departments = fetch_departments()
return __employees_group_by_seniority(departments)
@app.get("/employee/american-by-seniority")
def american_employees_by_seniority():
american_departments = fetch_american_departments()
return __employees_group_by_seniority(american_departments)
@app.get("/employee/canadian-by-seniority")
def canadian_employees_by_seniority():
canadian_departments = fetch_canadian_departments()
return __employees_group_by_seniority(canadian_departments)
@app.get("/employee/european-by-seniority")
def european_employees_by_seniority():
european_departments = fetch_european_departments()
return __employees_group_by_seniority(european_departments)

View File

@@ -23,6 +23,100 @@ def __create_department(row):
))
def fetch_departments() -> List[Department]:
"""
Ricerca tutti i dipartimenti attivi
:return: elenco dei dipartimenti
"""
with connect(db_url) as connection:
with connection.cursor(cursor_factory=dbopts.DictCursor) as cursor:
cursor.execute(
"""
select
d.department_id,
d.department_name,
d.location_id,
l.postal_code,
l.street_address,
l.city,
l.state_province,
l.country_id
from
hr.departments d
left join hr.locations l on
d.location_id = l.location_id
"""
)
departments = [__create_department(row) for row in cursor.fetchall()]
return departments
def fetch_american_departments() -> List[Department]:
"""
Ricerca tutti i dipartimenti presenti negli USA.
:return: elenco dei dipartimenti USA
"""
with connect(db_url) as connection:
with connection.cursor(cursor_factory=dbopts.DictCursor) as cursor:
cursor.execute(
"""
select
d.department_id,
d.department_name,
d.location_id,
l.postal_code,
l.street_address,
l.city,
l.state_province,
l.country_id
from
hr.departments d
left join hr.locations l on
d.location_id = l.location_id
where
l.country_id = 'US'
"""
)
departments = [__create_department(row) for row in cursor.fetchall()]
return departments
def fetch_canadian_departments() -> List[Department]:
"""
Ricerca tutti i dipartimenti presenti in Canada.
:return: elenco dei dipartimenti canadesi
"""
with connect(db_url) as connection:
with connection.cursor(cursor_factory=dbopts.DictCursor) as cursor:
cursor.execute(
"""
select
d.department_id,
d.department_name,
d.location_id,
l.postal_code,
l.street_address,
l.city,
l.state_province,
l.country_id
from
hr.departments d
left join hr.locations l on
d.location_id = l.location_id
where
l.country_id = 'CA'
"""
)
departments = [__create_department(row) for row in cursor.fetchall()]
return departments
def fetch_european_departments() -> List[Department]:
"""
Ricerca tutti i dipartimenti dei paesi europei dove sono presenti
@@ -78,13 +172,24 @@ def fetch_department_employees(department_id: int) -> List[Employee]:
e.job_id,
e.salary,
e.manager_id,
e.department_id
e.department_id,
d.department_name,
l.location_id,
l.city,
l.postal_code,
l.state_province,
l.street_address,
l.country_id
from
hr.employees e
left join hr.departments d on
e.department_id = d.department_id
left join hr.locations l on
d.location_id = l.location_id
where
e.department_id = {department_id}
e.department_id = %s
""",
department_id
(department_id,)
)
# Prendo dal DB tutti i dipendenti di un dipartimento
@@ -99,7 +204,62 @@ def fetch_department_employees(department_id: int) -> List[Employee]:
job_id=row["job_id"],
salary=row["salary"],
manager_id=row["manager_id"],
department=None
department=__create_department(row)
))
return employees
def fetch_employees() -> List[Employee]:
"""
Ricerca tutti i dipendenti dell'azienda
:return: elenco dei dipendenti
"""
employees = []
with connect(db_url) as connection:
with connection.cursor(cursor_factory=dbopts.DictCursor) as cursor:
cursor.execute(
f"""
select
e.employee_id,
e.first_name,
e.last_name,
e.email,
e.phone_number,
e.hire_date,
e.job_id,
e.salary,
e.manager_id,
e.department_id,
d.department_name,
l.location_id,
l.city,
l.postal_code,
l.state_province,
l.street_address,
l.country_id
from
hr.employees e
left join hr.departments d on
e.department_id = d.department_id
left join hr.locations l on
d.location_id = l.location_id
"""
)
# Prendo dal DB tutti i dipendenti di un dipartimento
for row in cursor.fetchall():
employees.append(Employee(
employee_id=row["employee_id"],
first_name=row["first_name"],
last_name=row["last_name"],
email=row["email"],
phone_number=row["phone_number"],
hire_date=row["hire_date"],
job_id=row["job_id"],
salary=row["salary"],
manager_id=row["manager_id"],
department=__create_department(row)
))
return employees

26
main.py
View File

@@ -1,26 +0,0 @@
import itertools
from datetime import date
from dateutil import relativedelta as datedelta
from db.access import fetch_european_departments, fetch_department_employees
if __name__ == '__main__':
# Ricerco tutti i dipartimenti europei
for department in fetch_european_departments():
print(f"\n####### Department of {department.name} #######\n")
# Per ogni dipartimento ricerco i suoi impiegati
employees = [employee for employee in fetch_department_employees(department.department_id)]
for employee in employees:
print(f"{employee.last_name} {employee.first_name}")
print("\n")
# Raggruppo gli impiegati per anni di anzianità
calculate_seniority = lambda e: datedelta.relativedelta(date.today(), e.hire_date).years
employee_group_by_years = itertools.groupby(employees, calculate_seniority)
# Stampo i risultati dell'aggregazione
for years_to_employees in employee_group_by_years:
print(f"Employees with {years_to_employees[0]} years of seniority:")
for employee in years_to_employees[1]:
print(employee.last_name, employee.first_name)
print("\n")

View File

@@ -19,5 +19,12 @@ class Department:
def location_id(self) -> Location:
return self.__location
def to_json(self):
return {
'department_id': self.__department_id,
'name': self.__name,
'location': self.__location.to_json() if self.__location is not None else None
}
def __str__(self):
return f"Department({self.__department_id}, {self.__name}, {self.__location})"

View File

@@ -18,8 +18,8 @@ class Person:
class Employee(Person):
def __init__(self, employee_id, first_name, last_name, email, phone_number,
hire_date, department, job_id=0, salary=0, manager_id=0):
def __init__(self, employee_id: int, first_name: str, last_name: str, email: str, phone_number: str,
hire_date: date, department: Department, job_id: int = 0, salary: float = 0, manager_id: int = 0):
super().__init__(first_name, last_name)
self.__employee_id = employee_id
self.__email = email
@@ -62,6 +62,20 @@ class Employee(Person):
def manager_id(self) -> int:
return self.__manager_id
def to_json(self):
return {
'employee_id': self.__employee_id,
'first_name': self.first_name,
'last_name': self.last_name,
'email': self.__email,
'hire_date': self.__hire_date,
'department': self.__department.to_json() if self.__department is not None else None,
'job_id': self.__job_id,
# FIXME devo gestire meglio i tipi decimali
'salary': str(self.__salary),
'manager_id': self.__manager_id
}
def __str__(self):
return f"Employee({self.__employee_id}, {self.first_name}, {self.last_name}, " \
f"{self.__email}, {self.__phone_number}, {self.__hire_date})"

View File

@@ -32,6 +32,16 @@ class Location:
def country_id(self) -> int:
return self.__country_id
def to_json(self):
return {
'location_id': self.__location_id,
'street_address': self.__street_address,
'postal_code': self.__postal_code,
'city': self.__city,
'state_province': self.__state_province,
'country_id': self.__country_id
}
def __str__(self):
return f"Location({self.__location_id}, {self.__postal_code}, " \
f"{self.__street_address}, {self.__city}, {self.__state_province}) "

View File

@@ -2,3 +2,4 @@ records~=0.5.3
psycopg2-binary~=2.9.1
setuptools~=57.0.0
python-dateutil~=2.8.1
Flask~=2.0.1