Source code for sweetrpg_db.mongodb.repo

# -*- coding: utf-8 -*-
__author__ = "Paul Schifferer <dm@sweetrpg.com>"
"""MongoDB repository module.
"""

from ..exceptions import ObjectNotFound
from bson.objectid import ObjectId
from bson.timestamp import Timestamp
import datetime
from .options import QueryOptions
from pymongo.write_concern import WriteConcern
import logging
from mongoengine.queryset import QuerySet
from mongoengine import Document


[docs] class MongoDataRepository(object): """A repository class for interacting with a MongoDB database."""
[docs] def __init__(self, **kwargs): """Create a MongoDB repository instance. :param kwargs: Keyword arguments for setting up the repository connection. :key model: The class of the model for this connection. :key document: The class of the document for this connection. :key db: A :class:`PyMongo` object used for connecting to the database. """ self.model_class = kwargs["model"] self.document_class = kwargs["document"] # self.db = kwargs.get("db") # print(dir(self.document_class)) self.collection = kwargs["collection"] # self.document_class.meta["collection"]
def __repr__(self): return f"<MongoDataRepository(model_class={self.model_class}, document_class={self.document_class}, collection={self.collection})>"
[docs] def _handle_value(self, value): """Convert a value to a string. :param any value: The value to convert. Supports :class:`bson.objectid.ObjectId`, :class:`datetime.datetime`, :class:`bson.timestamp.Timestamp`, and lists of any of those types. :return str: A string of the specified value. """ if isinstance(value, ObjectId): logging.debug("converting ObjectId('%s')...", value) return str(value) elif isinstance(value, datetime.datetime): logging.debug("converting datetime value '%s'...", value) d = value.replace(tzinfo=datetime.timezone.utc) return d.isoformat(timespec="milliseconds") elif isinstance(value, Timestamp): logging.debug("converting Timestamp '%s'...", value) return value.as_datetime() elif isinstance(value, list): logging.debug("converting list '%s'...", value) return list(map(self._handle_value, value)) logging.debug("returning unprocessed value '%s'...", value) return value
[docs] def _modify_record(self, record: dict) -> dict: """Modify a record by converting any values to strings, and renaming the internal '_id' field to 'id'. :param dict record: The record to modify. :return dict: The modified record. """ modified_record = {} for k, v in record.items(): logging.debug("k: %s, v: %s", k, v) if k == "_id": k = "id" modified_value = self._handle_value(v) modified_record[k] = modified_value logging.debug("k: %s, v (modified): %s", k, modified_value) return modified_record
[docs] def _adjust_sort(self, sort: tuple) -> str: if sort[1] < 0: return f"-{sort[0]}" return f"+{sort[0]}"
[docs] def create(self, data: dict) -> Document: """Inserts a new object in the database with the data provided. :param dict data: The data for the object :return Document: The inserted document. """ logging.debug("data: %s", data) # collection = self.db[self.collection] logging.info("Creating new %s record with data %s...", self.document_class.__name__, data) doc = self.document_class(**data) logging.debug("doc: %s", doc) # result = collection.with_options(write_concern=WriteConcern(w=3, j=True)).insert_one(data) doc.validate() doc.save() logging.debug("saved doc: %s", doc) return doc
[docs] def get(self, record_id: str, deleted: bool = False) -> Document: """Fetch a single record from the database. :param str record_id: The identifier for the record to fetch. This value is compared against the attribute specified in `id_attr`. :param bool deleted: Include "deleted" objects in the query :return Document: An instance of the object type from `model_class`. """ logging.debug("record_id: %s", record_id) id_value = record_id if isinstance(id_value, str): id_value = ObjectId(record_id) logging.debug("id_value: %s", id_value) query_filter = {"_id": id_value} if not deleted: query_filter.update({"deleted_at": {"$not": {"$type": "date"}}}) logging.debug("query_filter: %s", query_filter) logging.info("Fetching %s record for ID %s...", self.document_class.__name__, id_value) record = self.document_class.objects(__raw__=query_filter).first() # QuerySet(self.document_class, self.collection) # print(f"qs: {qs}") # logging.debug("qs: %s", qs) # record = None # qs.get(**query_filter) logging.debug("record: %s", record) # if not record: # raise ObjectNotFound(f"Record not found where for '{record_id}'") return record
[docs] def query(self, options: QueryOptions, deleted: bool = False) -> list: """Perform a query for objects in the database. :param QueryOptions options: (Optional) Options specifying limits to the query's returned results :return list: Returns a list of Document-subclass instances matching the query. """ logging.debug("options: %s", options) query_filter = options.filters or {} if not deleted: query_filter.update({"deleted_at": {"$not": {"$type": "date"}}}) logging.debug("query_filter: %s", query_filter) logging.info("Searching for %s records matching filter %s...", self.document_class, query_filter) records = ( self.document_class.objects(__raw__=query_filter) .order_by(*list(map(self._adjust_sort, options.sort))) .skip(options.skip) .limit(options.limit) .only(*options.projection) ) # qs = QuerySet(self.document_class, self.collection) # if options.skip > 0: # qs.skip(options.skip) # if options.limit > 0: # qs.limit(options.limit) # if len(options.projection) > 0: # qs.only(*options.projection) # if len(options.sort) > 0: # qs.order_by(*options.sort) # print(f"qs: {qs}") # logging.debug("qs: %s", qs) # query_set = self.document_class.objects.skip(options.skip).limit(options.limit).order_by(options.sort).only(*options.projection) # records = list(qs.all()) # .all() logging.debug("records: %s", records) # modified_records = map(self._modify_record, records) # logging.debug("modified_records: %s", modified_records) return list(records)
[docs] def update(self, record_id: str, update: dict, deleted: bool = False) -> Document: """Update the specified record. :param str record_id: The ID of the record to update. :param dict update: The data to update for the record. :param bool deleted: Indicates whether the update operation should look for deleted records. :return Document: The update version of the object. """ id_value = record_id if isinstance(id_value, str): id_value = ObjectId(record_id) # if self.id_attr == "_id": # logging.debug("ID attribute is '_id', converting to ObjectId") # id_value = ObjectId(record_id) obj_filter = {"_id": id_value} logging.debug("obj_filter: %s", obj_filter) update_oper = {"$set": update} logging.debug("update_oper: %s", update_oper) logging.info("Marking %s record %s deleted...", self.model_class, id_value) query_filter = {"_id": id_value} if not deleted: query_filter.update({"deleted_at": {"$not": {"$type": "date"}}}) logging.debug("query_filter: %s", query_filter) # doc = self.document_class.objects.raw(query_filter).update(update_oper) doc = self.get(record_id, deleted=deleted) # if doc is None: # logging.info("No document found to update for record ID %s.", record_id) # return False logging.debug("doc: %s", doc) doc.update(**update) return doc
[docs] def delete(self, record_id: str, actually: bool = False) -> bool: """'Delete' the specified record. Deletion is accomplished by setting the `deleted_at` field to the current timestamp, so that queries for the object will ignore it. :param str record_id: The record ID of the object to delete. This can be a string or :class:`bson.objectid.ObjectId`. :param bool actually: Actually delete the record instead of just marking it "deleted". :return bool: A boolean indicating whether the record was able to be marked deleted. :raises DoesNotExist: """ id_value = record_id if isinstance(id_value, str): id_value = ObjectId(record_id) doc = self.get(record_id) # if doc is None: # logging.info("No document found to delete for record ID %s.", record_id) # return False if actually: logging.info("Deleting %s record %s...", self.model_class.__name__, id_value) doc.delete() return True else: logging.info("Marking %s record %s deleted...", self.model_class.__name__, id_value) now = datetime.datetime.utcnow() doc.deleted_at = now updated_doc = doc.save() logging.debug("updated_doc: %s", updated_doc) return True return False