Clients
The template already ships with a mongo database client for connecting to MongoDB databases. However, if you need a client that can talk to e.g. PostgreSQL you need to add this.
from typing import Any
from pymongo.cursor import Cursor
from pymongo.database import Database
from pymongo.errors import DuplicateKeyError
from pymongo.mongo_client import MongoClient
from pymongo.results import DeleteResult, InsertManyResult
from common.exceptions import NotFoundException, ValidationException
from config import config
from data_providers.clients.client_interface import ClientInterface
MONGO_CLIENT: MongoClient[dict[str, Any]] = MongoClient(
    host=config.MONGODB_HOSTNAME,
    port=config.MONGODB_PORT,
    username=config.MONGODB_USERNAME,
    password=config.MONGODB_PASSWORD,
    authSource="admin",
    tls=False,
    connectTimeoutMS=5000,
    serverSelectionTimeoutMS=5000,
    retryWrites=False,
)
class MongoDatabaseClient(ClientInterface[dict, str]):
    def __init__(self, collection_name: str, database_name: str, client: MongoClient[dict[str, Any]] = MONGO_CLIENT):
        database: Database[dict[str, Any]] = client[database_name]
        self.database = database
        self.collection_name = collection_name
        self.collection = database[collection_name]
    def wipe_db(self) -> None:
        databases = self.database.client.list_database_names()
        databases_to_delete = [
            database_name for database_name in databases if database_name not in ("admin", "config", "local")
        ]  # Don't delete the mongo admin or local database
        for database_name in databases_to_delete:
            self.database.client.drop_database(database_name)
    def delete_collection(self) -> None:
        self.collection.drop()
    def create(self, document: dict[str, Any]) -> dict[str, Any]:
        try:
            result = self.collection.insert_one(document)
            return self.get(str(result.inserted_id))
        except DuplicateKeyError:
            raise ValidationException(message=f"The document with id '{document['_id']}' already exists")
    def list_collection(self) -> list[dict[str, Any]]:
        return list(self.collection.find())
    def get(self, uid: str) -> dict[str, Any]:
        document = self.collection.find_one(filter={"_id": uid})
        if document is None:
            raise NotFoundException
        else:
            return dict(document)
    def update(self, uid: str, document: dict[str, Any]) -> dict[str, Any]:
        if self.collection.find_one(filter={"_id": uid}) is None:
            raise NotFoundException(extra={"uid": uid})
        self.collection.replace_one({"_id": uid}, document)
        return self.get(uid)
    def delete(self, uid: str) -> bool:
        result = self.collection.delete_one(filter={"_id": uid})
        return result.deleted_count > 0
    def find(self, filter: dict[str, Any]) -> Cursor[dict[str, Any]]:
        return self.collection.find(filter=filter)
    def find_one(self, filter: dict[str, Any]) -> dict[str, Any] | None:
        return self.collection.find_one(filter=filter)
    def insert_many(self, items: list[dict[str, Any]]) -> InsertManyResult:
        return self.collection.insert_many(items)
    def delete_many(self, filter: dict[str, Any]) -> DeleteResult:
        return self.collection.delete_many(filter)
Testing clients
The test_client fixture are using the mongomock instead of real database.
import pytest
from common.exceptions import NotFoundException, ValidationException
from data_providers.clients.mongodb.mongo_database_client import MongoDatabaseClient
class TestMongoDatabaseClient:
    def test_find_one(self, test_client: MongoDatabaseClient):
        document = {"_id": "81549300", "name": "hello"}
        test_client.collection.insert_one(document)
        assert test_client.find_one({"_id": document["_id"]}) == document
        assert test_client.find_one({"name": document["name"]}) == document
        assert test_client.find_one({"_id": document["_id"], "name": document["name"]}) == document
        assert test_client.find_one({"_id": "unknown"}) is None
    def test_create(self, test_client: MongoDatabaseClient):
        document = {"_id": "987321", "name": "alberto"}
        assert test_client.collection.count_documents({}) == 0
        result = test_client.create(document)
        assert test_client.collection.count_documents({}) == 1
        assert result == document
        # try to create entry with already existing id:
        with pytest.raises(ValidationException):
            test_client.create(document)
    def test_get(self, test_client: MongoDatabaseClient):
        document = {"_id": "81549300", "name": "hello"}
        test_client.collection.insert_one(document)
        assert test_client.get(document["_id"]) == document
        with pytest.raises(NotFoundException):
            test_client.get("unknown")
    def test_find(self, test_client: MongoDatabaseClient):
        documents = [
            {"_id": "81549300", "name": "hello"},
            {"_id": "1a2b", "name": "pingvin"},
            {"_id": "987321", "name": "alberto"},
            {"_id": "987456", "name": "alberto"},
        ]
        test_client.collection.insert_many(documents)
        assert list(test_client.find({})) == documents
        assert list(test_client.find({"name": "alberto"})) == [
            documents[2],
            documents[3],
        ]
    def test_list(self, test_client: MongoDatabaseClient):
        documents = [
            {"_id": "81549300", "name": "hello"},
            {"_id": "1a2b", "name": "pingvin"},
            {"_id": "987321", "name": "alberto"},
            {"_id": "987456", "name": "alberto"},
        ]
        test_client.collection.insert_many(documents)
        assert test_client.list_collection() == documents
    def test_update(self, test_client: MongoDatabaseClient):
        document = {"_id": "987321", "name": "alberto"}
        test_client.collection.insert_one(document)
        instance = document
        instance_id = instance["_id"]
        instance["name"] = "Francois"
        assert test_client.find_one({"_id": instance_id}) != instance["name"]
        result = test_client.update(instance_id, instance)
        assert result["name"] == instance["name"]
        assert test_client.find_one({"_id": instance_id}) == instance
        # update with non-existing id
        entries = list(test_client.find({}))
        with pytest.raises(NotFoundException):
            assert test_client.update("instance_id", instance)
        assert list(test_client.find({})) == entries
    def test_delete(self, test_client: MongoDatabaseClient):
        document = {"_id": "987321", "name": "alberto"}
        test_client.collection.insert_one(document)
        test_client.delete(document["_id"])
        assert test_client.collection.count_documents({}) == 0
        assert test_client.find_one({"_id": document["_id"]}) is None
        # try to delete the same entry again
        test_client.delete(document["_id"])
        assert test_client.collection.count_documents({}) == 0
    def test_delete_collection(self, test_client: MongoDatabaseClient):
        documents = [
            {"_id": "81549300", "name": "hello"},
            {"_id": "1a2b", "name": "pingvin"},
            {"_id": "987321", "name": "alberto"},
            {"_id": "987456", "name": "alberto"},
        ]
        test_client.collection.insert_many(documents)
        # add second collection to TestDB:
        test_client.database.create_collection("peppers")
        active_collections = test_client.database.list_collection_names()
        number_of_entries_in_original_collection = test_client.collection.count_documents({})
        assert number_of_entries_in_original_collection > 0
        assert len(active_collections) == 2
        test_client.delete_collection()
        assert test_client.database.list_collection_names() == ["peppers"]
        assert test_client.collection.count_documents({}) == 0
    def test_create_in_empty_database(self, test_client: MongoDatabaseClient):
        document = {"_id": "1a2b", "name": "pingvin"}
        assert test_client.collection.count_documents({}) == 0
        result = test_client.create(document)
        assert test_client.collection.count_documents({}) == 1
        assert result == document
    def test_wipe_db(self, test_client: MongoDatabaseClient):
        documents = [
            {"_id": "81549300", "name": "hello"},
            {"_id": "1a2b", "name": "pingvin"},
            {"_id": "987321", "name": "alberto"},
            {"_id": "987456", "name": "alberto"},
        ]
        test_client.collection.insert_many(documents)
        original_database = test_client.database.client.list_database_names()[0]
        collections_in_original_db = test_client.database.client[original_database].list_collection_names()
        # add admin database (admin collection should not be wiped)
        test_client.database.client["admin"].create_collection("vips")
        active_dbs = test_client.database.client.list_database_names()
        collections_in_admin_db = test_client.database.client["admin"].list_collection_names()
        assert len(active_dbs) == 2
        assert len(collections_in_original_db) == 1
        assert len(collections_in_admin_db) == 1
        test_client.wipe_db()
        assert test_client.database.client.list_database_names() == ["admin"]
        assert test_client.database[collections_in_original_db[0]].count_documents({}) == 0