Clients
The template already ships with a mongo database client for connecting to MongoDB databases. However, if you need a client that can talk to e.g. PostgreSQL you need to add this.
from typing import Any
from pymongo.cursor import Cursor
from pymongo.database import Database
from pymongo.errors import DuplicateKeyError
from pymongo.mongo_client import MongoClient
from pymongo.results import DeleteResult, InsertManyResult
from common.exceptions import NotFoundException, ValidationException
from config import config
from data_providers.clients.client_interface import ClientInterface
MONGO_CLIENT: MongoClient[dict[str, Any]] = MongoClient(
host=config.MONGODB_HOSTNAME,
port=config.MONGODB_PORT,
username=config.MONGODB_USERNAME,
password=config.MONGODB_PASSWORD,
authSource="admin",
tls=False,
connectTimeoutMS=5000,
serverSelectionTimeoutMS=5000,
retryWrites=False,
)
class MongoDatabaseClient(ClientInterface[dict, str]):
def __init__(self, collection_name: str, database_name: str, client: MongoClient[dict[str, Any]] = MONGO_CLIENT):
database: Database[dict[str, Any]] = client[database_name]
self.database = database
self.collection_name = collection_name
self.collection = database[collection_name]
def wipe_db(self) -> None:
databases = self.database.client.list_database_names()
databases_to_delete = [
database_name for database_name in databases if database_name not in ("admin", "config", "local")
] # Don't delete the mongo admin or local database
for database_name in databases_to_delete:
self.database.client.drop_database(database_name)
def delete_collection(self) -> None:
self.collection.drop()
def create(self, document: dict[str, Any]) -> dict[str, Any]:
try:
result = self.collection.insert_one(document)
return self.get(str(result.inserted_id))
except DuplicateKeyError:
raise ValidationException(message=f"The document with id '{document['_id']}' already exists")
def list_collection(self) -> list[dict[str, Any]]:
return list(self.collection.find())
def get(self, uid: str) -> dict[str, Any]:
document = self.collection.find_one(filter={"_id": uid})
if document is None:
raise NotFoundException
else:
return dict(document)
def update(self, uid: str, document: dict[str, Any]) -> dict[str, Any]:
if self.collection.find_one(filter={"_id": uid}) is None:
raise NotFoundException(extra={"uid": uid})
self.collection.replace_one({"_id": uid}, document)
return self.get(uid)
def delete(self, uid: str) -> bool:
result = self.collection.delete_one(filter={"_id": uid})
return result.deleted_count > 0
def find(self, filter: dict[str, Any]) -> Cursor[dict[str, Any]]:
return self.collection.find(filter=filter)
def find_one(self, filter: dict[str, Any]) -> dict[str, Any] | None:
return self.collection.find_one(filter=filter)
def insert_many(self, items: list[dict[str, Any]]) -> InsertManyResult:
return self.collection.insert_many(items)
def delete_many(self, filter: dict[str, Any]) -> DeleteResult:
return self.collection.delete_many(filter)
Testing clients
The test_client
fixture are using the mongomock instead of real database.
import pytest
from common.exceptions import NotFoundException, ValidationException
from data_providers.clients.mongodb.mongo_database_client import MongoDatabaseClient
class TestMongoDatabaseClient:
def test_find_one(self, test_client: MongoDatabaseClient):
document = {"_id": "81549300", "name": "hello"}
test_client.collection.insert_one(document)
assert test_client.find_one({"_id": document["_id"]}) == document
assert test_client.find_one({"name": document["name"]}) == document
assert test_client.find_one({"_id": document["_id"], "name": document["name"]}) == document
assert test_client.find_one({"_id": "unknown"}) is None
def test_create(self, test_client: MongoDatabaseClient):
document = {"_id": "987321", "name": "alberto"}
assert test_client.collection.count_documents({}) == 0
result = test_client.create(document)
assert test_client.collection.count_documents({}) == 1
assert result == document
# try to create entry with already existing id:
with pytest.raises(ValidationException):
test_client.create(document)
def test_get(self, test_client: MongoDatabaseClient):
document = {"_id": "81549300", "name": "hello"}
test_client.collection.insert_one(document)
assert test_client.get(document["_id"]) == document
with pytest.raises(NotFoundException):
test_client.get("unknown")
def test_find(self, test_client: MongoDatabaseClient):
documents = [
{"_id": "81549300", "name": "hello"},
{"_id": "1a2b", "name": "pingvin"},
{"_id": "987321", "name": "alberto"},
{"_id": "987456", "name": "alberto"},
]
test_client.collection.insert_many(documents)
assert list(test_client.find({})) == documents
assert list(test_client.find({"name": "alberto"})) == [
documents[2],
documents[3],
]
def test_list(self, test_client: MongoDatabaseClient):
documents = [
{"_id": "81549300", "name": "hello"},
{"_id": "1a2b", "name": "pingvin"},
{"_id": "987321", "name": "alberto"},
{"_id": "987456", "name": "alberto"},
]
test_client.collection.insert_many(documents)
assert test_client.list_collection() == documents
def test_update(self, test_client: MongoDatabaseClient):
document = {"_id": "987321", "name": "alberto"}
test_client.collection.insert_one(document)
instance = document
instance_id = instance["_id"]
instance["name"] = "Francois"
assert test_client.find_one({"_id": instance_id}) != instance["name"]
result = test_client.update(instance_id, instance)
assert result["name"] == instance["name"]
assert test_client.find_one({"_id": instance_id}) == instance
# update with non-existing id
entries = list(test_client.find({}))
with pytest.raises(NotFoundException):
assert test_client.update("instance_id", instance)
assert list(test_client.find({})) == entries
def test_delete(self, test_client: MongoDatabaseClient):
document = {"_id": "987321", "name": "alberto"}
test_client.collection.insert_one(document)
test_client.delete(document["_id"])
assert test_client.collection.count_documents({}) == 0
assert test_client.find_one({"_id": document["_id"]}) is None
# try to delete the same entry again
test_client.delete(document["_id"])
assert test_client.collection.count_documents({}) == 0
def test_delete_collection(self, test_client: MongoDatabaseClient):
documents = [
{"_id": "81549300", "name": "hello"},
{"_id": "1a2b", "name": "pingvin"},
{"_id": "987321", "name": "alberto"},
{"_id": "987456", "name": "alberto"},
]
test_client.collection.insert_many(documents)
# add second collection to TestDB:
test_client.database.create_collection("peppers")
active_collections = test_client.database.list_collection_names()
number_of_entries_in_original_collection = test_client.collection.count_documents({})
assert number_of_entries_in_original_collection > 0
assert len(active_collections) == 2
test_client.delete_collection()
assert test_client.database.list_collection_names() == ["peppers"]
assert test_client.collection.count_documents({}) == 0
def test_create_in_empty_database(self, test_client: MongoDatabaseClient):
document = {"_id": "1a2b", "name": "pingvin"}
assert test_client.collection.count_documents({}) == 0
result = test_client.create(document)
assert test_client.collection.count_documents({}) == 1
assert result == document
def test_wipe_db(self, test_client: MongoDatabaseClient):
documents = [
{"_id": "81549300", "name": "hello"},
{"_id": "1a2b", "name": "pingvin"},
{"_id": "987321", "name": "alberto"},
{"_id": "987456", "name": "alberto"},
]
test_client.collection.insert_many(documents)
original_database = test_client.database.client.list_database_names()[0]
collections_in_original_db = test_client.database.client[original_database].list_collection_names()
# add admin database (admin collection should not be wiped)
test_client.database.client["admin"].create_collection("vips")
active_dbs = test_client.database.client.list_database_names()
collections_in_admin_db = test_client.database.client["admin"].list_collection_names()
assert len(active_dbs) == 2
assert len(collections_in_original_db) == 1
assert len(collections_in_admin_db) == 1
test_client.wipe_db()
assert test_client.database.client.list_database_names() == ["admin"]
assert test_client.database[collections_in_original_db[0]].count_documents({}) == 0