This repository has been archived by the owner on Sep 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b652724
commit 2547c18
Showing
1 changed file
with
87 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,135 +1,161 @@ | ||
from sqlite3 import connect, Row | ||
from aiosqlite import connect, Row | ||
from asyncinit import asyncinit | ||
|
||
|
||
class Database: | ||
def __init__(self, db_file): | ||
self.connection = connect(db_file, check_same_thread=False) | ||
self.connection.row_factory = Row | ||
self.cursor = self.connection.cursor() | ||
self.connection.execute("""CREATE TABLE IF NOT EXISTS jokes ( | ||
@asyncinit | ||
class Database(): | ||
async def __init__(self, db_file): | ||
self.db_file = db_file | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
await db.execute("""CREATE TABLE IF NOT EXISTS jokes ( | ||
user_id INTEGER, | ||
joke TEXT, | ||
author TEXT | ||
)""") | ||
self.connection.execute("""CREATE TABLE IF NOT EXISTS newJokes ( | ||
await db.execute("""CREATE TABLE IF NOT EXISTS newJokes ( | ||
user_id INTEGER, | ||
joke TEXT, | ||
author TEXT | ||
)""") | ||
self.connection.execute("""CREATE TABLE IF NOT EXISTS users ( | ||
await db.execute("""CREATE TABLE IF NOT EXISTS users ( | ||
user_id INTEGER | ||
)""") | ||
|
||
async def recordJoke(self, joke, author, user_id): | ||
"""Запись шутки""" | ||
with self.connection: | ||
async with connect(self.db_file) as db: | ||
rowid = await self.rowid(user_id) | ||
moreShows = [(rowid, joke, author)] | ||
self.cursor.executemany( | ||
await db.executemany( | ||
"INSERT INTO jokes (user_id,joke,author) VALUES (?, ?, ?)", moreShows) | ||
self.cursor.executemany( | ||
await db.executemany( | ||
"INSERT INTO newJokes (user_id,joke,author) VALUES (?, ?, ?)", moreShows) | ||
await db.commit() | ||
|
||
async def randomJoke(self): | ||
"""Отправка рандомной шутки от пользователей бота""" | ||
with self.connection: | ||
records = self.cursor.execute( | ||
"SELECT joke, author FROM jokes ORDER BY RANDOM() LIMIT 1").fetchmany(1) | ||
if not bool(len(records)): | ||
return "Нету шуток 😞, но ты можешь записать свою шутку 😉" | ||
for row in records: | ||
return f'{row["joke"]} Автор: {row["author"]}' | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
async with db.execute( | ||
"SELECT joke, author FROM jokes ORDER BY RANDOM() LIMIT 1") as cursor: | ||
async for row in cursor: | ||
if not bool(len(row)): | ||
return "Нету шуток 😞, но ты можешь записать свою шутку 😉" | ||
return f'{row["joke"]} Автор: {row["author"]}' | ||
|
||
async def myJoke(self, user_id): | ||
"""Просмотр своих шуток""" | ||
with self.connection: | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
rowid = await self.rowid(user_id) | ||
records = self.cursor.execute( | ||
f"SELECT joke, author FROM jokes WHERE user_id = '%s'" % rowid).fetchall() | ||
msg = '' | ||
if not bool(len(records)): | ||
async with db.execute( | ||
f"SELECT joke, author FROM jokes WHERE user_id = '%s'" % rowid) as cursor: | ||
msg = '' | ||
async for row in cursor: | ||
msg += f'{row["joke"]}\n\n' | ||
if not bool(len(msg)): | ||
return "Нету шуток 😞, но ты можешь записать свою шутку 😉" | ||
for row in records: | ||
msg += f'{row["joke"]}\n\n' | ||
return msg | ||
|
||
async def newsJoke(self): | ||
"""Вывод последней шутки""" | ||
with self.connection: | ||
records = self.cursor.execute( | ||
"SELECT * FROM newJokes LIMIT 1").fetchmany(1) | ||
for row in records: | ||
return row | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
async with db.execute( | ||
"SELECT * FROM newJokes LIMIT 1") as cursor: | ||
async for row in cursor: | ||
return row | ||
|
||
async def deleteOldJoke(self): | ||
"""Удаление старой шутки""" | ||
with self.connection: | ||
self.cursor.execute( | ||
async with connect(self.db_file) as db: | ||
await db.execute( | ||
"DELETE FROM newJokes where ROWID = 1") | ||
await db.commit() | ||
|
||
async def newsJokesExists(self): | ||
"""Проверка шуток""" | ||
with self.connection: | ||
result = self.cursor.execute( | ||
"SELECT count(*) FROM newJokes").fetchone()["count(*)"] | ||
if result < 1: | ||
return False | ||
return True | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
async with db.execute( | ||
"SELECT count(*) FROM newJokes") as cursor: | ||
async for row in cursor: | ||
if row["count(*)"] < 1: | ||
return False | ||
return True | ||
|
||
async def quantityUsers(self): | ||
"""Количество пользователей""" | ||
with self.connection: | ||
return self.cursor.execute("SELECT count(*) FROM users").fetchone()["count(*)"] | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
async with db.execute("SELECT count(*) FROM users") as cursor: | ||
async for row in cursor: | ||
return row["count(*)"] | ||
|
||
async def quantityJokesUser(self, user_id): | ||
"""Количество шуток у пользователя""" | ||
with self.connection: | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
rowid = await self.rowid(user_id) | ||
return self.cursor.execute("SELECT count(*) FROM jokes WHERE user_id = ?", (rowid,)).fetchone()["count(*)"] | ||
async with db.execute("SELECT count(*) FROM jokes WHERE user_id = ?", (rowid,)) as cursor: | ||
async for row in cursor: | ||
return row["count(*)"] | ||
|
||
async def userExists(self, user_id): | ||
"""Проверка пользовотеля""" | ||
with self.connection: | ||
result = self.cursor.execute( | ||
"SELECT * FROM users WHERE user_id = ?", (user_id,)).fetchmany(1) | ||
return bool(len(result)) | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
async with db.execute( | ||
"SELECT * FROM users WHERE user_id = ?", (user_id,)) as cursor: | ||
async for row in cursor: | ||
return bool(len(row)) | ||
|
||
async def userAdd(self, user_id): | ||
"""Добавление пользователя""" | ||
with self.connection: | ||
self.cursor.execute( | ||
async with connect(self.db_file) as db: | ||
await db.execute( | ||
"INSERT INTO users (user_id) VALUES (?)", (user_id,)) | ||
await db.commit() | ||
|
||
async def infoId(self, id): | ||
"""Просмотр пользователя""" | ||
with self.connection: | ||
return self.cursor.execute("SELECT * FROM users WHERE ROWID = ?", (id,)).fetchone()["user_id"] | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
async with db.execute("SELECT * FROM users WHERE ROWID = ?", (id,)) as cursor: | ||
async for row in cursor: | ||
return row["user_id"] | ||
|
||
async def rowid(self, user_id): | ||
"""Поиск пользователя""" | ||
with self.connection: | ||
async with connect(self.db_file) as db: | ||
db.row_factory = Row | ||
if not await self.userExists(user_id): | ||
await self.userAdd(user_id) | ||
return self.cursor.execute( | ||
f"SELECT rowid FROM users WHERE user_id = '%s'" % user_id).fetchone()["rowid"] | ||
async with db.execute( | ||
f"SELECT rowid FROM users WHERE user_id = '%s'" % user_id) as cursor: | ||
async for row in cursor: | ||
return row["rowid"] | ||
|
||
async def deleteJokes(self): | ||
"""Удаление всех шуток""" | ||
with self.connection: | ||
self.cursor.executescript("""DELETE FROM jokes; | ||
DELETE FROM newJokes | ||
async with connect(self.db_file) as db: | ||
await db.executescript("""DELETE FROM jokes; | ||
DELETE FROM newJokes | ||
""") | ||
await db.commit() | ||
|
||
async def deleteJokesUser(self, user_id): | ||
"""Удаление своих шуток""" | ||
with self.connection: | ||
async with connect(self.db_file) as db: | ||
rowid = await self.rowid(user_id) | ||
self.cursor.execute( | ||
await db.execute( | ||
f"DELETE FROM jokes WHERE user_id = '%s'" % rowid) | ||
await db.commit() | ||
|
||
async def dump(self, user_id): | ||
"""Дамп бд""" | ||
with self.connection: | ||
async with connect(self.db_file) as db: | ||
with open(f"{user_id}.sql", "w", encoding='utf 8') as file: | ||
for sql in self.connection.iterdump(): | ||
async for sql in db.iterdump(): | ||
file.write(sql) |