63 lines
2.3 KiB
Python
63 lines
2.3 KiB
Python
from typing import Union
|
|
|
|
from bson import ObjectId
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
|
|
|
|
class CRUDBase:
|
|
def __init__(self, coll_name):
|
|
self.coll_name = coll_name
|
|
|
|
async def get(self, db, id: Union[ObjectId, str]) -> dict:
|
|
return (await db[self.coll_name].find_one({'_id': id})) or dict()
|
|
|
|
async def insert_one(self, db, document):
|
|
return await db[self.coll_name].insert_one(document)
|
|
|
|
async def find_one(self, db, filter=None, *args, **kwargs):
|
|
return (await db[self.coll_name].find_one(filter, *args, **kwargs)) or dict()
|
|
|
|
async def exists(self, db, filter=None, *args, **kwargs):
|
|
return bool(await db[self.coll_name].find_one(filter, *args, **kwargs)) or False
|
|
|
|
async def read_have(self, db, v: str, **kwargs):
|
|
where = {'members': v}
|
|
where.update(kwargs)
|
|
cursor = db[self.coll_name].find(where)
|
|
return await cursor.to_list(length=9999)
|
|
|
|
async def find_many(self, db, *args, **kwargs):
|
|
cursor = db[self.coll_name].find(*args, **kwargs)
|
|
return await cursor.to_list(length=9999)
|
|
|
|
def find(self, db, *args, **kwargs):
|
|
cursor = db[self.coll_name].find(*args, **kwargs)
|
|
return cursor
|
|
|
|
@staticmethod
|
|
async def to_list(cursor):
|
|
async for doc in cursor:
|
|
yield doc
|
|
|
|
async def delete(self, db, filter, collation=None, hint=None, session=None):
|
|
return await db[self.coll_name].delete_many(filter, collation, hint, session)
|
|
|
|
async def delete_id(self, db, *args):
|
|
return await db[self.coll_name].delete_many({'_id': {'$in': list(args)}})
|
|
|
|
async def update_one(self, db, filter, update, upsert=False):
|
|
res = await db[self.coll_name].update_one(filter, update, upsert)
|
|
return res
|
|
|
|
async def update_many(self, db, filter, update, upsert=False):
|
|
return await db[self.coll_name].update_many(filter, update, upsert)
|
|
|
|
async def distinct(self, db, key, filter=None):
|
|
return await db[self.coll_name].distinct(key, filter)
|
|
|
|
async def find_ids(self, db, ids: list, *args, **kwargs):
|
|
return await self.find_many(db, {'_id': {'$in': ids}}, *args, **kwargs)
|
|
|
|
# async def _create_index(self, db: AsyncIOMotorDatabase, *args, **kwargs):
|
|
# return await db[self.coll_name].create_index(*args, **kwargs)
|