diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index aee9b5b9..8325e1f5 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -7,6 +7,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/rs/zerolog" @@ -31,7 +32,12 @@ const historySyncDispatchWait = 30 * time.Second func (wa *WhatsAppClient) historySyncLoop(ctx context.Context) { dispatchTimer := time.NewTimer(historySyncDispatchWait) - dispatchTimer.Stop() + + if !wa.isNewLogin && wa.UserLogin.Metadata.(*waid.UserLoginMetadata).HistorySyncPortalsNeedCreating { + dispatchTimer.Reset(5 * time.Second) + } else { + dispatchTimer.Stop() + } wa.UserLogin.Log.Debug().Msg("Starting history sync loop") for { select { @@ -196,9 +202,12 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) { } log.Info().Int("conversation_count", len(conversations)).Msg("Creating portals from history sync") rateLimitErrors := 0 + var wg sync.WaitGroup + wg.Add(len(conversations)) for i := 0; i < len(conversations); i++ { conv := conversations[i] if conv.ChatJID == types.StatusBroadcastJID && !wa.Main.Config.EnableStatusBroadcast { + wg.Done() continue } // TODO can the chat info fetch be avoided entirely? @@ -207,10 +216,15 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) { if errors.Is(err, whatsmeow.ErrNotInGroup) { log.Debug().Stringer("chat_jid", conv.ChatJID). Msg("Skipping creating room because the user is not a participant") - err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, conv.ChatJID) + //err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, conv.ChatJID) + //if err != nil { + // log.Err(err).Msg("Failed to delete historical messages for portal") + //} + err = wa.Main.DB.Conversation.Delete(ctx, wa.UserLogin.ID, conv.ChatJID) if err != nil { - log.Err(err).Msg("Failed to delete historical messages for portal") + log.Err(err).Msg("Failed to delete conversation user is not in") } + wg.Done() continue } else if errors.Is(err, whatsmeow.ErrIQRateOverLimit) { rateLimitErrors++ @@ -222,6 +236,7 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) { continue } else if err != nil { log.Err(err).Stringer("chat_jid", conv.ChatJID).Msg("Failed to get chat info") + wg.Done() continue } wa.Main.Bridge.QueueRemoteEvent(wa.UserLogin, &simplevent.ChatResync{ @@ -230,12 +245,28 @@ func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) { LogContext: nil, PortalKey: wa.makeWAPortalKey(conv.ChatJID), CreatePortal: true, + PostHandleFunc: func(ctx context.Context, portal *bridgev2.Portal) { + err := wa.Main.DB.Conversation.MarkBridged(ctx, wa.UserLogin.ID, conv.ChatJID) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to mark conversation as bridged") + } + wg.Done() + }, }, ChatInfo: wrappedInfo, LatestMessageTS: conv.LastMessageTimestamp, }) } log.Info().Int("conversation_count", len(conversations)).Msg("Finished creating portals from history sync") + go func() { + wg.Wait() + wa.UserLogin.Metadata.(*waid.UserLoginMetadata).HistorySyncPortalsNeedCreating = false + err = wa.UserLogin.Save(ctx) + if err != nil { + log.Err(err).Msg("Failed to save user login history sync portals created flag") + } + log.Info().Msg("Finished processing all history sync chat resync events") + }() } func (wa *WhatsAppClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) { diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 89050047..07f406e7 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -98,6 +98,7 @@ type WhatsAppClient struct { mediaRetryLock *semaphore.Weighted lastPhoneOfflineWarning time.Time + isNewLogin bool } var ( diff --git a/pkg/connector/login.go b/pkg/connector/login.go index a52665c6..5ae9fc55 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -308,6 +308,8 @@ func (wl *WALogin) Wait(ctx context.Context) (*bridgev2.LoginStep, error) { Metadata: &waid.UserLoginMetadata{ WADeviceID: wl.LoginSuccess.ID.Device, Timezone: wl.Timezone, + + HistorySyncPortalsNeedCreating: true, }, }, &bridgev2.NewLoginParams{ DeleteOnConflict: true, @@ -316,6 +318,7 @@ func (wl *WALogin) Wait(ctx context.Context) (*bridgev2.LoginStep, error) { return nil, fmt.Errorf("failed to create user login: %w", err) } + ul.Client.(*WhatsAppClient).isNewLogin = true err = ul.Client.Connect(ul.Log.WithContext(context.Background())) if err != nil { return nil, fmt.Errorf("failed to connect after login: %w", err) diff --git a/pkg/connector/wadb/conversation.go b/pkg/connector/wadb/conversation.go index 48fd4ce6..3141f3c5 100644 --- a/pkg/connector/wadb/conversation.go +++ b/pkg/connector/wadb/conversation.go @@ -30,6 +30,7 @@ type Conversation struct { EphemeralSettingTimestamp *int64 MarkedAsUnread *bool UnreadCount *uint32 + Bridged bool } func parseHistoryTime(ts *uint64) time.Time { @@ -63,9 +64,10 @@ const ( upsertHistorySyncConversationQuery = ` INSERT INTO whatsapp_history_sync_conversation ( bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time, - end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count + end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, + unread_count, bridged ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (bridge_id, user_login_id, chat_jid) DO UPDATE SET last_message_timestamp=CASE @@ -81,21 +83,24 @@ const ( ephemeral_expiration=COALESCE(excluded.ephemeral_expiration, whatsapp_history_sync_conversation.ephemeral_expiration), ephemeral_setting_timestamp=COALESCE(excluded.ephemeral_setting_timestamp, whatsapp_history_sync_conversation.ephemeral_setting_timestamp), marked_as_unread=COALESCE(excluded.marked_as_unread, whatsapp_history_sync_conversation.marked_as_unread), - unread_count=COALESCE(excluded.unread_count, whatsapp_history_sync_conversation.unread_count) + unread_count=COALESCE(excluded.unread_count, whatsapp_history_sync_conversation.unread_count), + bridged=false ` getRecentConversations = ` SELECT bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time, - end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count + end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, + unread_count, bridged FROM whatsapp_history_sync_conversation - WHERE bridge_id=$1 AND user_login_id=$2 + WHERE bridge_id=$1 AND user_login_id=$2 AND bridged=false ORDER BY last_message_timestamp DESC LIMIT $3 ` getConversationByJID = ` SELECT bridge_id, user_login_id, chat_jid, last_message_timestamp, archived, pinned, mute_end_time, - end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, unread_count + end_of_history_transfer_type, ephemeral_expiration, ephemeral_setting_timestamp, marked_as_unread, + unread_count, bridged FROM whatsapp_history_sync_conversation WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 ` @@ -104,6 +109,11 @@ const ( DELETE FROM whatsapp_history_sync_conversation WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 ` + markConversationBridged = ` + UPDATE whatsapp_history_sync_conversation + SET bridged=true + WHERE bridge_id=$1 AND user_login_id=$2 AND chat_jid=$3 + ` ) func (cq *ConversationQuery) Put(ctx context.Context, conv *Conversation) error { @@ -120,6 +130,10 @@ func (cq *ConversationQuery) GetRecent(ctx context.Context, loginID networkid.Us return cq.QueryMany(ctx, getRecentConversations, cq.BridgeID, loginID, limitPtr) } +func (cq *ConversationQuery) MarkBridged(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) error { + return cq.Exec(ctx, markConversationBridged, cq.BridgeID, loginID, chatJID) +} + func (cq *ConversationQuery) Get(ctx context.Context, loginID networkid.UserLoginID, chatJID types.JID) (*Conversation, error) { return cq.QueryOne(ctx, getConversationByJID, cq.BridgeID, loginID, chatJID) } @@ -153,6 +167,7 @@ func (c *Conversation) sqlVariables() []any { c.EphemeralSettingTimestamp, c.MarkedAsUnread, c.UnreadCount, + c.Bridged, } } @@ -171,6 +186,7 @@ func (c *Conversation) Scan(row dbutil.Scannable) (*Conversation, error) { &c.EphemeralSettingTimestamp, &c.MarkedAsUnread, &c.UnreadCount, + &c.Bridged, ) if err != nil { return nil, err diff --git a/pkg/connector/wadb/upgrades/00-latest-schema.sql b/pkg/connector/wadb/upgrades/00-latest-schema.sql index 9165e729..dbc41d12 100644 --- a/pkg/connector/wadb/upgrades/00-latest-schema.sql +++ b/pkg/connector/wadb/upgrades/00-latest-schema.sql @@ -1,4 +1,4 @@ --- v0 -> v3 (compatible with v3+): Latest revision +-- v0 -> v4 (compatible with v3+): Latest revision CREATE TABLE whatsapp_poll_option_id ( bridge_id TEXT NOT NULL, @@ -13,9 +13,9 @@ CREATE TABLE whatsapp_poll_option_id ( ); CREATE TABLE whatsapp_history_sync_conversation ( - bridge_id TEXT NOT NULL, - user_login_id TEXT NOT NULL, - chat_jid TEXT NOT NULL, + bridge_id TEXT NOT NULL, + user_login_id TEXT NOT NULL, + chat_jid TEXT NOT NULL, last_message_timestamp BIGINT, archived BOOLEAN, @@ -27,6 +27,8 @@ CREATE TABLE whatsapp_history_sync_conversation ( marked_as_unread BOOLEAN, unread_count INTEGER, + bridged BOOLEAN NOT NULL DEFAULT false, + PRIMARY KEY (bridge_id, user_login_id, chat_jid), CONSTRAINT whatsapp_history_sync_conversation_user_login_fkey FOREIGN KEY (bridge_id, user_login_id) REFERENCES user_login (bridge_id, id) ON UPDATE CASCADE ON DELETE CASCADE diff --git a/pkg/connector/wadb/upgrades/04-conversation-bridged-flag.sql b/pkg/connector/wadb/upgrades/04-conversation-bridged-flag.sql new file mode 100644 index 00000000..3880692f --- /dev/null +++ b/pkg/connector/wadb/upgrades/04-conversation-bridged-flag.sql @@ -0,0 +1,2 @@ +-- v4 (compatible with v3+): Add bridged flag for history sync conversations +ALTER TABLE history_sync_conversation ADD COLUMN bridged BOOLEAN NOT NULL DEFAULT false; diff --git a/pkg/waid/dbmeta.go b/pkg/waid/dbmeta.go index 3ebadb2d..e60a1c0d 100644 --- a/pkg/waid/dbmeta.go +++ b/pkg/waid/dbmeta.go @@ -33,6 +33,8 @@ type UserLoginMetadata struct { PhoneLastPinged jsontime.Unix `json:"phone_last_pinged"` Timezone string `json:"timezone"` PushKeys *PushKeys `json:"push_keys,omitempty"` + + HistorySyncPortalsNeedCreating bool `json:"history_sync_portals_need_creating,omitempty"` } type PushKeys struct {