From c8000cb6cd1750262499b5235012c57f5783c978 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Fri, 20 Dec 2024 14:21:21 +0530 Subject: [PATCH] chore: remove all mutexes and store in subscription struct --- internal/nostr/models.go | 49 ++++++++++++++++++++-------------------- internal/nostr/nostr.go | 42 ++++++++++++---------------------- 2 files changed, 39 insertions(+), 52 deletions(-) diff --git a/internal/nostr/models.go b/internal/nostr/models.go index 756f3b6..adb04d1 100644 --- a/internal/nostr/models.go +++ b/internal/nostr/models.go @@ -25,30 +25,31 @@ const ( ) type Subscription struct { - ID uint - RelayUrl string - WebhookUrl string - PushToken string - IsIOS bool - Open bool - Ids *[]string `gorm:"-"` - Kinds *[]int `gorm:"-"` - Authors *[]string `gorm:"-"` // WalletPubkey is included in this - Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag - Since time.Time - Until time.Time - Limit int - Search string - CreatedAt time.Time - UpdatedAt time.Time - Uuid string `gorm:"type:uuid;default:gen_random_uuid()"` - EventChan chan *nostr.Event `gorm:"-"` - RequestEvent *RequestEvent `gorm:"-"` - - IdsJson json.RawMessage `gorm:"type:jsonb"` - KindsJson json.RawMessage `gorm:"type:jsonb"` - AuthorsJson json.RawMessage `gorm:"type:jsonb"` - TagsJson json.RawMessage `gorm:"type:jsonb"` + ID uint + RelayUrl string + WebhookUrl string + PushToken string + IsIOS bool + Open bool + Ids *[]string `gorm:"-"` + Kinds *[]int `gorm:"-"` + Authors *[]string `gorm:"-"` // WalletPubkey is included in this + Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag + Since time.Time + Until time.Time + Limit int + Search string + CreatedAt time.Time + UpdatedAt time.Time + Uuid string `gorm:"type:uuid;default:gen_random_uuid()"` + EventChan chan *nostr.Event `gorm:"-"` + RequestEvent *RequestEvent `gorm:"-"` + RelaySubscription *nostr.Subscription `gorm:"-"` + + IdsJson json.RawMessage `gorm:"type:jsonb"` + KindsJson json.RawMessage `gorm:"type:jsonb"` + AuthorsJson json.RawMessage `gorm:"type:jsonb"` + TagsJson json.RawMessage `gorm:"type:jsonb"` } func (s *Subscription) BeforeSave(tx *gorm.DB) error { diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index 48b68ff..da521e1 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -47,7 +47,6 @@ type Service struct { Relay *nostr.Relay Cfg *Config Logger *logrus.Logger - subscriptions map[string]*nostr.Subscription subscriptionsMutex sync.Mutex relayMutex sync.Mutex client *expo.PushClient @@ -116,8 +115,6 @@ func NewService(ctx context.Context) (*Service, error) { return nil, err } - subscriptions := make(map[string]*nostr.Subscription) - client := expo.NewPushClient(&expo.ClientConfig{ Host: "https://api.expo.dev", APIURL: "/v2", @@ -131,7 +128,6 @@ func NewService(ctx context.Context) (*Service, error) { Wg: &wg, Logger: logger, Relay: relay, - subscriptions: subscriptions, client: client, } @@ -685,15 +681,11 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error { } func (svc *Service) stopSubscription(subscription *Subscription) error { - svc.subscriptionsMutex.Lock() - sub, exists := svc.subscriptions[subscription.Uuid] - if exists { - sub.Unsub() - delete(svc.subscriptions, subscription.Uuid) + if subscription.RelaySubscription != nil { + subscription.RelaySubscription.Unsub() } - svc.subscriptionsMutex.Unlock() - if (!exists && !subscription.Open) { + if (!subscription.Open) { return errors.New(SUBSCRIPTION_ALREADY_CLOSED) } @@ -739,7 +731,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri continue } - sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter}) + relaySubscription, err := relay.Subscribe(ctx, []nostr.Filter{*filter}) if err != nil { // TODO: notify user about subscription failure waitToReconnectSeconds = max(waitToReconnectSeconds, 1) @@ -751,9 +743,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri continue } - svc.subscriptionsMutex.Lock() - svc.subscriptions[subscription.Uuid] = sub - svc.subscriptionsMutex.Unlock() + subscription.RelaySubscription = relaySubscription svc.Logger.WithFields(logrus.Fields{ "subscription_id": subscription.ID, @@ -791,10 +781,8 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subscription) { walletPubkey, clientPubkey := getPubkeys(subscription) - svc.subscriptionsMutex.Lock() - sub := svc.subscriptions[subscription.Uuid] - svc.subscriptionsMutex.Unlock() - err := sub.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent) + relaySubscription := subscription.RelaySubscription + err := relaySubscription.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent) if err != nil { // TODO: notify user about publish failure svc.Logger.WithError(err).WithFields(logrus.Fields{ @@ -804,7 +792,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc "client_pubkey": clientPubkey, }).Error("Failed to publish to relay") subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED - sub.Unsub() + relaySubscription.Unsub() } else { svc.Logger.WithFields(logrus.Fields{ "request_event_id": subscription.RequestEvent.NostrId, @@ -860,15 +848,13 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs } func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error { - svc.subscriptionsMutex.Lock() - sub := svc.subscriptions[subscription.Uuid] - svc.subscriptionsMutex.Unlock() + relaySubscription := subscription.RelaySubscription go func(){ // block till EOS is received for nip 47 handlers // only if request event is not yet published if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) { - <-sub.EndOfStoredEvents + <-relaySubscription.EndOfStoredEvents svc.Logger.WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, @@ -878,7 +864,7 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio } // loop through incoming events - for event := range sub.Events { + for event := range relaySubscription.Events { go handleEvent(event, subscription) } @@ -889,11 +875,11 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio }() select { - case <-sub.Relay.Context().Done(): - return sub.Relay.ConnectionError + case <-relaySubscription.Relay.Context().Done(): + return relaySubscription.Relay.ConnectionError case <-ctx.Done(): return nil - case <-sub.Context.Done(): + case <-relaySubscription.Context.Done(): return nil } }