Skip to content

Commit

Permalink
backfill: restart portal creation if interrupted soon after login
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Nov 6, 2024
1 parent 8dc2701 commit 8ae6198
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 13 deletions.
37 changes: 34 additions & 3 deletions pkg/connector/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -31,7 +32,12 @@ const historySyncDispatchWait = 30 * time.Second

func (wa *WhatsAppClient) historySyncLoop(ctx context.Context) {
dispatchTimer := time.NewTimer(historySyncDispatchWait)
dispatchTimer.Stop()

if !wa.isNewLogin && wa.UserLogin.Metadata.(*waid.UserLoginMetadata).HistorySyncPortalsNeedCreating {
dispatchTimer.Reset(5 * time.Second)
} else {
dispatchTimer.Stop()
}
wa.UserLogin.Log.Debug().Msg("Starting history sync loop")
for {
select {
Expand Down Expand Up @@ -196,9 +202,12 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) {
}
log.Info().Int("conversation_count", len(conversations)).Msg("Creating portals from history sync")
rateLimitErrors := 0
var wg sync.WaitGroup
wg.Add(len(conversations))
for i := 0; i < len(conversations); i++ {
conv := conversations[i]
if conv.ChatJID == types.StatusBroadcastJID && !wa.Main.Config.EnableStatusBroadcast {
wg.Done()
continue
}
// TODO can the chat info fetch be avoided entirely?
Expand All @@ -207,10 +216,15 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) {
if errors.Is(err, whatsmeow.ErrNotInGroup) {
log.Debug().Stringer("chat_jid", conv.ChatJID).
Msg("Skipping creating room because the user is not a participant")
err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, conv.ChatJID)
//err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, conv.ChatJID)
//if err != nil {
// log.Err(err).Msg("Failed to delete historical messages for portal")
//}
err = wa.Main.DB.Conversation.Delete(ctx, wa.UserLogin.ID, conv.ChatJID)
if err != nil {
log.Err(err).Msg("Failed to delete historical messages for portal")
log.Err(err).Msg("Failed to delete conversation user is not in")
}
wg.Done()
continue
} else if errors.Is(err, whatsmeow.ErrIQRateOverLimit) {
rateLimitErrors++
Expand All @@ -222,6 +236,7 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) {
continue
} else if err != nil {
log.Err(err).Stringer("chat_jid", conv.ChatJID).Msg("Failed to get chat info")
wg.Done()
continue
}
wa.Main.Bridge.QueueRemoteEvent(wa.UserLogin, &simplevent.ChatResync{
Expand All @@ -230,12 +245,28 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) {
LogContext: nil,
PortalKey: wa.makeWAPortalKey(conv.ChatJID),
CreatePortal: true,
PostHandleFunc: func(ctx context.Context, portal *bridgev2.Portal) {
err := wa.Main.DB.Conversation.MarkBridged(ctx, wa.UserLogin.ID, conv.ChatJID)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to mark conversation as bridged")
}
wg.Done()
},
},
ChatInfo: wrappedInfo,
LatestMessageTS: conv.LastMessageTimestamp,
})
}
log.Info().Int("conversation_count", len(conversations)).Msg("Finished creating portals from history sync")
go func() {
wg.Wait()
wa.UserLogin.Metadata.(*waid.UserLoginMetadata).HistorySyncPortalsNeedCreating = false
err = wa.UserLogin.Save(ctx)
if err != nil {
log.Err(err).Msg("Failed to save user login history sync portals created flag")
}
log.Info().Msg("Finished processing all history sync chat resync events")
}()
}

func (wa *WhatsAppClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type WhatsAppClient struct {
mediaRetryLock *semaphore.Weighted

lastPhoneOfflineWarning time.Time
isNewLogin bool
}

var (
Expand Down
3 changes: 3 additions & 0 deletions pkg/connector/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ func (wl *WALogin) Wait(ctx context.Context) (*bridgev2.LoginStep, error) {
Metadata: &waid.UserLoginMetadata{
WADeviceID: wl.LoginSuccess.ID.Device,
Timezone: wl.Timezone,

HistorySyncPortalsNeedCreating: true,
},
}, &bridgev2.NewLoginParams{
DeleteOnConflict: true,
Expand All @@ -316,6 +318,7 @@ func (wl *WALogin) Wait(ctx context.Context) (*bridgev2.LoginStep, error) {
return nil, fmt.Errorf("failed to create user login: %w", err)
}

ul.Client.(*WhatsAppClient).isNewLogin = true
err = ul.Client.Connect(ul.Log.WithContext(context.Background()))
if err != nil {
return nil, fmt.Errorf("failed to connect after login: %w", err)
Expand Down
28 changes: 22 additions & 6 deletions pkg/connector/wadb/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Conversation struct {
EphemeralSettingTimestamp *int64
MarkedAsUnread *bool
UnreadCount *uint32
Bridged bool
}

func parseHistoryTime(ts *uint64) time.Time {
Expand Down Expand Up @@ -63,9 +64,10 @@ const (
upsertHistorySyncConversationQuery = `
INSERT INTO whatsapp_history_sync_conversation (
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread,
unread_count, bridged
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (bridge_id, user_login_id, chat_jid)
DO UPDATE SET
last_message_timestamp=CASE
Expand All @@ -81,21 +83,24 @@ const (
ephemeral_expiration=COALESCE(excluded.ephemeral_expiration, whatsapp_history_sync_conversation.ephemeral_expiration),
ephemeral_setting_timestamp=COALESCE(excluded.ephemeral_setting_timestamp, whatsapp_history_sync_conversation.ephemeral_setting_timestamp),
marked_as_unread=COALESCE(excluded.marked_as_unread, whatsapp_history_sync_conversation.marked_as_unread),
unread_count=COALESCE(excluded.unread_count, whatsapp_history_sync_conversation.unread_count)
unread_count=COALESCE(excluded.unread_count, whatsapp_history_sync_conversation.unread_count),
bridged=false
`
getRecentConversations = `
SELECT
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread,
unread_count, bridged
FROM whatsapp_history_sync_conversation
WHERE bridge_id=$1 AND user_login_id=$2
WHERE bridge_id=$1 AND user_login_id=$2 AND bridged=false
ORDER BY last_message_timestamp DESC
LIMIT $3
`
getConversationByJID = `
SELECT
bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time,
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count
end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread,
unread_count, bridged
FROM whatsapp_history_sync_conversation
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
`
Expand All @@ -104,6 +109,11 @@ const (
DELETE FROM whatsapp_history_sync_conversation
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
`
markConversationBridged = `
UPDATE whatsapp_history_sync_conversation
SET bridged=true
WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3
`
)

func (cq *ConversationQuery) Put(ctx context.Context, conv *Conversation) error {
Expand All @@ -120,6 +130,10 @@ func (cq *ConversationQuery) GetRecent(ctx context.Context, loginID networkid.Us
return cq.QueryMany(ctx, getRecentConversations, cq.BridgeID, loginID, limitPtr)
}

func (cq *ConversationQuery) MarkBridged(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) error {
return cq.Exec(ctx, markConversationBridged, cq.BridgeID, loginID, chatJID)
}

func (cq *ConversationQuery) Get(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) (*Conversation, error) {
return cq.QueryOne(ctx, getConversationByJID, cq.BridgeID, loginID, chatJID)
}
Expand Down Expand Up @@ -153,6 +167,7 @@ func (c *Conversation) sqlVariables() []any {
c.EphemeralSettingTimestamp,
c.MarkedAsUnread,
c.UnreadCount,
c.Bridged,
}
}

Expand All @@ -171,6 +186,7 @@ func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) {
&c.EphemeralSettingTimestamp,
&c.MarkedAsUnread,
&c.UnreadCount,
&c.Bridged,
)
if err != nil {
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions pkg/connector/wadb/upgrades/00-latest-schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- v0 -> v3 (compatible with v3+): Latest revision
-- v0 -> v4 (compatible with v3+): Latest revision

CREATE TABLE whatsapp_poll_option_id (
bridge_id TEXT NOT NULL,
Expand All @@ -13,9 +13,9 @@ CREATE TABLE whatsapp_poll_option_id (
);

CREATE TABLE whatsapp_history_sync_conversation (
bridge_id TEXT NOT NULL,
user_login_id TEXT NOT NULL,
chat_jid TEXT NOT NULL,
bridge_id TEXT NOT NULL,
user_login_id TEXT NOT NULL,
chat_jid TEXT NOT NULL,

last_message_timestamp BIGINT,
archived BOOLEAN,
Expand All @@ -27,6 +27,8 @@ CREATE TABLE whatsapp_history_sync_conversation (
marked_as_unread BOOLEAN,
unread_count INTEGER,

bridged BOOLEAN NOT NULL DEFAULT false,

PRIMARY KEY (bridge_id, user_login_id, chat_jid),
CONSTRAINT whatsapp_history_sync_conversation_user_login_fkey FOREIGN KEY (bridge_id, user_login_id)
REFERENCES user_login (bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE
Expand Down
2 changes: 2 additions & 0 deletions pkg/connector/wadb/upgrades/04-conversation-bridged-flag.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- v4 (compatible with v3+): Add bridged flag for history sync conversations
ALTER TABLE history_sync_conversation ADD COLUMN bridged BOOLEAN NOT NULL DEFAULT false;
2 changes: 2 additions & 0 deletions pkg/waid/dbmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type UserLoginMetadata struct {
PhoneLastPinged jsontime.Unix `json:"phone_last_pinged"`
Timezone string `json:"timezone"`
PushKeys *PushKeys `json:"push_keys,omitempty"`

HistorySyncPortalsNeedCreating bool `json:"history_sync_portals_need_creating,omitempty"`
}

type PushKeys struct {
Expand Down

0 comments on commit 8ae6198

Please sign in to comment.