Data Access Object

The models described in Model are unaware of how the remote REST API works and how to interact with it through HTTP requests.

Data Access Objects (from now on refered as DAOs) are used to trigger requests for a model in the REST API and map values in a transparent way. Requests handled by the DAO are typically CRUD.

Let’s assume that a Client API library already exists to create Employee models in the remote REST API:

from typing import Dict, Any

import json
import aiohttp


class ClientAPI:
    session: aiohttp.ClientSession
    url = "http://my-remote-rest-api"

    def __init__(self):
        self.session = aiohttp.ClientSession(raise_for_status=True)

    async def create_employee(self, employee: Dict[str, Any]) -> int:
        employees_url = f"{self.url}/employees"
        payload = json.dumps(employee)

        response = await self.session.post(employees_url, data=payload.encode())

        # get the key created by the server
        location = response.headers["Location"]
        key = location.split("/")[-1]

        return int(key)

We can now use a DAO to persist the model Employee using the ClientAPI:

from typing import FrozenSet, Optional

from restio.dao import BaseDAO
from restio.fields import FrozenSetModelField, FrozenType, IntField, StrField
from restio.model import BaseModel


class Employee(BaseModel):
    key: IntField = IntField(pk=True, allow_none=True, frozen=FrozenType.ALWAYS)
    name: StrField = StrField()
    age: IntField = IntField(default=18)
    address: StrField = StrField(default="Company Address")

    @address.setter
    def _validate_address(self, address: str):
        if not address:
            raise ValueError("Invalid address.")
        return address


class EmployeeDAO(BaseDAO[Employee]):
    api = ClientAPI()

    async def add(self, obj: Employee):
        employee_dict = self._map_to_dict(obj)
        key = await self.api.create_employee(employee_dict)

        # update the model with the key generated on the server
        obj.key = key

    @staticmethod
    def _map_to_dict(model: Employee) -> Dict[str, Any]:
        return dict(name=model.name, age=model.age, address=model.address)

At this point, it is already possible to interface with the remote server through the DAO:

# We now connect an EmployeeDAO object to an Employee model
employee_dao = EmployeeDAO(Employee)

# create a new employee locally
employee_to_add = Employee(name="Carlos", age=52)

# persist the employe on the remote server
await employee_dao.add(employee_to_add)

# new key has been assigned to the object
employee_to_add.key  # 1234

DAOs on their own are not very useful. They need to be associated with a Session instance in order to interact with restio properly (see Session for more detail).

In order to be used by a Session, BaseDAO contains 4 base methods that can potentially be overwritten: get, add, update and remove. None of these methods are purely abstract, which means that a DAOs can only have a few of them implemented. It is up to the developer to decide which methods to include.

Method Caller | Parameters | When
get
Session | Tuple with primary keys | Before, during or after a commit, when a model needs to
| be retrieved from the server
add Session | Model object | During a commit, when a model is to be added to the server
update Session | Model object | During a commit, when a model is to be updated in the server
remove Session | Model object | During a commit, when a model is to be removed from the server

A complete implementation of the EmployeeDAO and ClientAPI for all CRUD operations can be seen below:

from typing import Dict, Any, FrozenSet, Optional

import json
import aiohttp

from restio.dao import BaseDAO
from restio.fields import FrozenSetModelField, FrozenType, IntField, StrField
from restio.model import BaseModel


class ClientAPI:
    session: aiohttp.ClientSession
    url = "http://my-remote-rest-api"

    def __init__(self):
        self.session = aiohttp.ClientSession(raise_for_status=True)

    async def get_employee(self, key: int) -> Dict[str, Any]:
        employee_url = f"{self.url}/employees/{key}"
        result = await self.session.get(employee_url)
        return await result.json()

    async def create_employee(self, employee: Dict[str, Any]) -> int:
        employees_url = f"{self.url}/employees"
        payload = json.dumps(employee)

        response = await self.session.post(employees_url, data=payload.encode())

        # get the key created by the server
        location = response.headers["Location"]
        key = location.split("/")[-1]

        return int(key)

    async def update_employee(self, key: int, employee: Dict[str, Any]):
        employee_url = f"{self.url}/employees/{key}"
        payload = json.dumps(employee)
        await self.session.put(employee_url, data=payload.encode())

    async def remove_employee(self, key: int):
        employee_url = f"{self.url}/employees/{key}"
        await self.session.delete(employee_url)


class Employee(BaseModel):
    key: IntField = IntField(pk=True, allow_none=True, frozen=FrozenType.ALWAYS)
    name: StrField = StrField()
    age: IntField = IntField(default=18)
    address: StrField = StrField(default="Company Address")

    @address.setter
    def _validate_address(self, address: str):
        if not address:
            raise ValueError("Invalid address.")
        return address


class EmployeeDAO(BaseDAO[Employee]):
    api = ClientAPI()

    # Employee only contains one pk `key`, so it must be an argument
    async def get(self, *, key: int) -> Employee:
        employee_data = await self.api.get_employee(key)
        return self._map_from_dict(employee_data)

    async def add(self, obj: Employee):
        employee_dict = self._map_to_dict(obj)
        key = await self.api.create_employee(employee_dict)

        # update the model with the key generated on the server
        obj.key = key

    async def update(self, obj: Employee):
        employee_dict = self._map_to_dict(obj)
        await self.api.update_employee(obj.key, employee_dict)

    async def remove(self, obj: Employee):
        await self.api.remove_employee(obj.key)

    @staticmethod
    def _map_from_dict(data: Dict[str, Any]) -> Employee:
        employee = Employee(name=str(data["name"]), age=int(data["age"]), address=str(data["address"]))
        employee.key = int(data["key"])
        return employee

    @staticmethod
    def _map_to_dict(model: Employee) -> Dict[str, Any]:
        return dict(name=model.name, age=model.age, address=model.address)