From 3ad2369abfc1446bdc67466eda6a8cf7398ea7a6 Mon Sep 17 00:00:00 2001 From: Nikita Karpukhin Date: Sun, 21 Jul 2024 13:41:18 +0200 Subject: [PATCH] fix a deadlock and some other bugs --- app/distortioner.go | 3 +++ app/handler_helpers.go | 1 + app/queue/honest_priority_queue.go | 22 +++++++++------------- app/tools/video_worker.go | 5 ++++- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/app/distortioner.go b/app/distortioner.go index 6e72d26..5b854b2 100644 --- a/app/distortioner.go +++ b/app/distortioner.go @@ -344,6 +344,9 @@ func main() { priorityChatsStr := strings.Split(os.Getenv("DISTORTIONER_PRIORITY_CHATS"), ",") priorityChats := make([]int64, len(priorityChatsStr)) for i, s := range priorityChatsStr { + if s == "" { + continue + } priorityChats[i], err = strconv.ParseInt(s, 10, 64) if err != nil { logger.Fatal(err) diff --git a/app/handler_helpers.go b/app/handler_helpers.go index 0af1674..7205d10 100644 --- a/app/handler_helpers.go +++ b/app/handler_helpers.go @@ -137,6 +137,7 @@ func (d DistorterBot) SendMessage(c tb.Context, toSend interface{}, method Metho b.Reply(message, NotEnoughRights) case strings.Contains(err.Error(), "bot was blocked by the user (403)"): d.videoWorker.BanUser(message.Chat.ID) + return nil, nil case strings.Contains(err.Error(), "telegram: Bad Request: message to be replied not found (400)"): return d.SendMessage(c, toSend, Send) } diff --git a/app/queue/honest_priority_queue.go b/app/queue/honest_priority_queue.go index 12c01ec..39acd5d 100644 --- a/app/queue/honest_priority_queue.go +++ b/app/queue/honest_priority_queue.go @@ -76,20 +76,11 @@ func (hjq *HonestJobQueue) Pop() *Job { job := heap.Pop(&hjq.queue).(*Job) - if _, ok := hjq.banned[job.userID]; ok { - allBannedJobsExhausted := true - for _, queued := range hjq.queue { - if queued.userID == job.userID { - allBannedJobsExhausted = false - break - } + for _, ok := hjq.banned[job.userID]; ok; { + if hjq.queue.Len() == 0 { + return nil } - - if allBannedJobsExhausted { - delete(hjq.banned, job.userID) - } - - return hjq.Pop() + job = heap.Pop(&hjq.queue).(*Job) } hjq.users[job.userID]-- @@ -134,6 +125,11 @@ func (hjq *HonestJobQueue) Push(userID int64, runnable func()) error { return errors.New("You're distorting videos too often, wait until the previous ones have been processed") } + // if a user sent us a message then we're clearly unbanned + if _, ok := hjq.banned[userID]; ok { + delete(hjq.banned, userID) + } + hjq.users[userID] = priority + 1 job := newJob(userID, priority, runnable) diff --git a/app/tools/video_worker.go b/app/tools/video_worker.go index 04caa96..b4b6fdb 100644 --- a/app/tools/video_worker.go +++ b/app/tools/video_worker.go @@ -29,7 +29,10 @@ func (vw *VideoWorker) BanUser(userID int64) { func (vw *VideoWorker) run() { for range vw.messenger { - vw.queue.Pop().Run() + job := vw.queue.Pop() + if job != nil { + job.Run() + } } }