Data Access Object¶
The models described in Model are unaware of how the remote REST API works and how to interact with it through HTTP requests.
Data Access Objects (from now on refered as DAOs) are used to trigger requests for a model in the REST API and map values in a transparent way. Requests handled by the DAO are typically CRUD.
Let’s assume that a Client API library already exists to create Employee models in the remote REST API:
from typing import Dict, Any
import json
import aiohttp
class ClientAPI:
session: aiohttp.ClientSession
url = "http://my-remote-rest-api"
def __init__(self):
self.session = aiohttp.ClientSession(raise_for_status=True)
async def create_employee(self, employee: Dict[str, Any]) -> int:
employees_url = f"{self.url}/employees"
payload = json.dumps(employee)
await self.session.post(employees_url, data=payload.encode())
new_employee_data = await self.get_employee_by_name(employee["name"])
return int(new_employee_data["key"])
async def get_employee_by_name(self, name: str) -> Dict[str, Any]:
employees_url = f"{self.url}/employees"
result = await self.session.get(employees_url)
employees = await result.json()
for employee in employees:
if employee["name"] == name:
return employee
raise RuntimeError(f"Employee with name {name} not found.")
We can now use a DAO to persist the model Employee using the ClientAPI:
from typing import FrozenSet, Optional
from restio.dao import BaseDAO
from restio.fields import FrozenSetModelField, FrozenType, IntField, StrField
from restio.model import BaseModel
class Employee(BaseModel):
key: IntField = IntField(pk=True, allow_none=True, frozen=FrozenType.ALWAYS)
name: StrField = StrField()
age: IntField = IntField(default=18)
address: StrField = StrField(default="Company Address")
def __init__(
self,
*,
name: str,
age: Optional[int] = None,
address: Optional[str] = None,
) -> None:
self.name = name
self.age = age or self.age # uses default
self.change_address(address or self.address) # uses default
def change_address(self, new_address: str):
if not new_address:
raise ValueError("Invalid address.")
self.address = new_address
class EmployeeDAO(BaseDAO[Employee]):
api = ClientAPI()
async def add(self, obj: Employee):
employee_dict = self._map_to_dict(obj)
key = await self.api.create_employee(employee_dict)
# update the model with the key generated on the server
obj.key = key
@staticmethod
def _map_to_dict(model: Employee) -> Dict[str, Any]:
return dict(name=model.name, age=model.age, address=model.address)
At this point, it is already possible to interface with the remote server through the DAO:
# We now connect an EmployeeDAO object to an Employee model
employee_dao = EmployeeDAO(Employee)
# create a new employee locally
employee_to_add = Employee(name="Carlos", age=52)
# persist the employe on the remote server
await employee_dao.add(employee_to_add)
# new key has been assigned to the object
employee_to_add.key # 1234
DAOs on their own are not very useful. They need to be associated with a Transaction instance in order to interact with restio properly (see Transaction for more detail).
In order to be used by a Transaction, BaseDAO contains 4 base methods that can potentially be overwritten: get, add, update and remove. None of these methods are purely abstract, which means that a DAOs can only have a few of them implemented. It is up to the developer to decide which methods to include.
| Method | Caller | Parameters | When |
|---|---|---|---|
| get | Transaction | Tuple with primary keys | Before, during or after a commit, when a model needs to be retrieved from the server |
| add | Transaction | Model object | During a commit, when a model is to be added to the server |
| update | Transaction | Model object | During a commit, when a model is to be updated in the server |
| remove | Transaction | Model object | During a commit, when a model is to be removed from the server |
A complete implementation of the EmployeeDAO and ClientAPI for all CRUD operations can be seen below:
from typing import Dict, Any, FrozenSet, Optional
import json
import aiohttp
from restio.dao import BaseDAO
from restio.fields import FrozenSetModelField, FrozenType, IntField, StrField
from restio.model import BaseModel
class ClientAPI:
session: aiohttp.ClientSession
url = "http://my-remote-rest-api"
def __init__(self):
self.session = aiohttp.ClientSession(raise_for_status=True)
async def get_employee(self, key: int) -> Dict[str, Any]:
employee_url = f"{self.url}/employees/{key}"
result = await self.session.get(employee_url)
return await result.json()
async def create_employee(self, employee: Dict[str, Any]) -> int:
employees_url = f"{self.url}/employees"
payload = json.dumps(employee)
await self.session.post(employees_url, data=payload.encode())
new_employee_data = await self.get_employee_by_name(employee["name"])
return int(new_employee_data["key"])
async def update_employee(self, key: int, employee: Dict[str, Any]):
employee_url = f"{self.url}/employees/{key}"
payload = json.dumps(employee)
await self.session.put(employee_url, data=payload.encode())
async def remove_employee(self, key: int):
employee_url = f"{self.url}/employees/{key}"
await self.session.delete(employee_url)
async def get_employee_by_name(self, name: str) -> Dict[str, Any]:
employees_url = f"{self.url}/employees"
result = await self.session.get(employees_url)
employees = await result.json()
for employee in employees:
if employee["name"] == name:
return employee
raise RuntimeError(f"Employee with name {name} not found.")
class Employee(BaseModel):
key: IntField = IntField(pk=True, allow_none=True, frozen=FrozenType.ALWAYS)
name: StrField = StrField()
age: IntField = IntField(default=18)
address: StrField = StrField(default="Company Address")
def __init__(
self,
*,
name: str,
age: Optional[int] = None,
address: Optional[str] = None,
) -> None:
self.name = name
self.age = age or self.age # uses default
self.change_address(address or self.address) # uses default
def change_address(self, new_address: str):
if not new_address:
raise ValueError("Invalid address.")
self.address = new_address
class EmployeeDAO(BaseDAO[Employee]):
api = ClientAPI()
# Employee only contains one pk `key`, so it must be an argument
async def get(self, *, key: str) -> Employee:
employee_data = await self.api.get_employee(key)
return self._map_from_dict(employee_data)
async def add(self, obj: Employee):
employee_dict = self._map_to_dict(obj)
key = await self.api.create_employee(employee_dict)
# update the model with the key generated on the server
obj.key = key
async def update(self, obj: Employee):
employee_dict = self._map_to_dict(obj)
await self.api.update_employee(obj.key, employee_dict)
async def remove(self, obj: Employee):
await self.api.remove_employee(obj.key)
@staticmethod
def _map_from_dict(data: Dict[str, Any]) -> Employee:
employee = Employee(name=str(data["name"]), age=int(data["age"]), address=str(data["address"]))
employee.key = int(data["key"])
return employee
@staticmethod
def _map_to_dict(model: Employee) -> Dict[str, Any]:
return dict(name=model.name, age=model.age, address=model.address)