-
-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy pathcog.py
228 lines (193 loc) · 8.05 KB
/
cog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
Cog for repost detection.
"""
# stolen from rubbergoddess
import asyncio
import time
import dhash
import disnake
from disnake.ext import commands
import utils
from cogs.base import Base
from database.image import ImageDB
from rubbergod import Rubbergod
from utils.checks import PermissionsCheck
from . import features
from .messages_cz import MessagesCZ
dhash.force_pil()
class Warden(Base, commands.Cog):
"""A cog for database lookups"""
def __init__(self, bot: Rubbergod):
super().__init__()
self.bot = bot
self.limit_full = 3
self.limit_hard = 7
self.limit_soft = 14
self.message_channel = None
def doCheckRepost(self, message: disnake.Message):
return (
message.channel.id in self.config.deduplication_channels
and message.attachments is not None
and len(message.attachments) > 0
and not message.author.bot
)
@commands.Cog.listener()
async def on_message(self, message: disnake.Message):
# repost check
if self.doCheckRepost(message):
await self.checkDuplicate(message)
@commands.Cog.listener()
async def on_message_delete(self, message: disnake.Message):
if self.doCheckRepost(message):
ImageDB.deleteByMessage(message.id)
# try to detect repost embed
messages = await message.channel.history(after=message, limit=10, oldest_first=True).flatten()
for mess in messages:
if not mess.author.bot:
continue
if not mess.embeds:
continue
try:
if str(message.id) == mess.embeds[0].footer.text:
await mess.delete()
break
except disnake.NotFound:
continue
async def handle_reaction(self, ctx):
"""Delete duplicate embed if original is not a duplicate"""
message = ctx.message
if ctx.member.id in self.config.repost_ignore_users:
await message.remove_reaction("❎", ctx.member)
return
for react in message.reactions:
if react.emoji == "❎" and react.count >= self.config.duplicate_limit:
try:
orig = message.embeds[0].footer.text
orig = await message.channel.fetch_message(int(orig))
await orig.remove_reaction("♻️", self.bot.user)
await orig.remove_reaction("🤷🏻", self.bot.user)
await orig.remove_reaction("🤔", self.bot.user)
except Exception as e:
self.bot.logger.warning("Warden:handle_reaction", "Could not remove bot's emote", e)
try:
await message.delete()
except disnake.errors.NotFound:
pass
@commands.group()
@PermissionsCheck.is_bot_admin()
async def scan(self, ctx: commands.Context):
"""Scan for reposts"""
if ctx.invoked_subcommand is None:
await ctx.send_help(ctx.invoked_with)
@commands.guild_only()
@commands.max_concurrency(1, per=commands.BucketType.default, wait=False)
@commands.bot_has_permissions(read_message_history=True)
@scan.command(name="history", brief=MessagesCZ.scan_brief)
async def scan_history(self, ctx: commands.Context, limit: int | str):
"""Scan current channel for images and save them as hashes
limit: [all | <int>]
"""
# parse parameter
if limit != "all":
try:
limit = int(limit)
if limit < 1:
raise ValueError
except ValueError:
raise commands.BadArgument("Expected 'all' or positive integer")
messages = await ctx.channel.history(limit=limit if limit != "all" else None).flatten()
title = "**INITIATING...**\n\nLoaded {} messages"
await asyncio.sleep(0.5)
template = (
"**SCANNING IN PROGRESS**\n\n"
"Processed **{}** of **{}** messages ({:.1f} %)\n"
"Computed **{}** hashes"
)
msg = await ctx.send(title(len(messages)))
ctr_nofile = 0
ctr_hashes = 0
i = 0
now = time.time()
for i, message in enumerate(messages):
# update info on every 10th message
if i % 50 == 0:
await msg.edit(content=template(i, len(messages), (i / len(messages) * 100), ctr_hashes))
if len(message.attachments) == 0:
ctr_nofile += 1
continue
hashes = [x async for x in features.saveMessageHashes(message)]
ctr_hashes += len(hashes)
await msg.edit(
content="**SCAN COMPLETE**\n\n"
f"Processed **{len(messages)}** messages.\n"
f"Computed **{ctr_hashes}** hashes in {(time.time() - now):.1f} seconds."
)
@scan.command(name="message")
async def scan_message(self, ctx: Rubbergod, link):
"""Scan message attachments in whole database"""
# TODO: implement
pass
async def checkDuplicate(self, message: disnake.Message):
"""Check if uploaded files are known"""
hashes = [x async for x in features.saveMessageHashes(message)]
if len(message.attachments) > 0 and len(hashes) == 0:
return
duplicates = {}
posts = ImageDB.getAll()
for img_hash in hashes:
hamming_min = 128
duplicate = None
for post in posts:
# skip current message
if post.message_id == message.id:
continue
# do the comparison
post_hash = int(post.dhash, 16)
hamming = dhash.get_num_bits_different(img_hash, post_hash)
if hamming < hamming_min:
duplicate = post
hamming_min = hamming
if duplicate is not None:
duplicates[duplicate] = hamming_min
for duplicate, hamming_min in duplicates.items():
if hamming_min <= self.limit_soft:
await self._announceDuplicate(message, duplicate, hamming_min)
async def _announceDuplicate(self, message: disnake.Message, original: ImageDB, hamming: int):
"""Send message that a post is a original
original: object
hamming: Hamming distance between the image and closest database entry
"""
if hamming <= self.limit_full:
title = "**♻️ To je repost!**"
reaction = "♻️"
elif hamming <= self.limit_hard:
title = "**♻️ To je asi repost**"
reaction = "🤔"
else:
title = "To je možná repost"
reaction = "🤷🏻"
prob = "{:.1f} %".format((1 - hamming / 128) * 100)
timestamp = utils.general.id_to_datetime(original.attachment_id).strftime("%Y-%m-%d %H:%M:%S")
src_chan = self.bot.get_guild(self.config.guild_id).get_channel(original.channel_id)
try:
src_post = await src_chan.fetch_message(original.message_id)
link = src_post.jump_url
author = disnake.utils.escape_markdown(src_post.author.display_name)
except disnake.NotFound:
link = "404 <:sadcat:576171980118687754>"
author = "_??? (404)_"
desc = MessagesCZ.repost_description(user=message.author.id, value=prob)
embed = disnake.Embed(title=title, color=0xCB410B, description=desc, url=message.jump_url)
embed.add_field(name=f"**{author}**, {timestamp}", value=link, inline=False)
embed.add_field(
name=MessagesCZ.repost_title,
value=MessagesCZ.repost_content(limit=self.config.duplicate_limit),
)
embed.set_footer(text=message.id)
send = await message.channel.send(embed=embed)
try:
await message.add_reaction(reaction)
except disnake.errors.NotFound:
await send.delete()
return
await send.add_reaction("❎")