Skip to content

Commit

Permalink
chore: remove all mutexes and store in subscription struct
Browse files Browse the repository at this point in the history
  • Loading branch information
im-adithya committed Dec 20, 2024
1 parent dd8e695 commit 192ad37
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 52 deletions.
49 changes: 25 additions & 24 deletions internal/nostr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,31 @@ const (
)

type Subscription struct {
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`
RelaySubscription *nostr.Subscription `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
}

func (s *Subscription) BeforeSave(tx *gorm.DB) error {
Expand Down
42 changes: 14 additions & 28 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type Service struct {
Relay *nostr.Relay
Cfg *Config
Logger *logrus.Logger
subscriptions map[string]*nostr.Subscription
subscriptionsMutex sync.Mutex
relayMutex sync.Mutex
client *expo.PushClient
Expand Down Expand Up @@ -116,8 +115,6 @@ func NewService(ctx context.Context) (*Service, error) {
return nil, err
}

subscriptions := make(map[string]*nostr.Subscription)

client := expo.NewPushClient(&expo.ClientConfig{
Host: "https://api.expo.dev",
APIURL: "/v2",
Expand All @@ -131,7 +128,6 @@ func NewService(ctx context.Context) (*Service, error) {
Wg: &wg,
Logger: logger,
Relay: relay,
subscriptions: subscriptions,
client: client,
}

Expand Down Expand Up @@ -685,15 +681,11 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {
}

func (svc *Service) stopSubscription(subscription *Subscription) error {
svc.subscriptionsMutex.Lock()
sub, exists := svc.subscriptions[subscription.Uuid]
if exists {
sub.Unsub()
delete(svc.subscriptions, subscription.Uuid)
if subscription.RelaySubscription != nil {
subscription.RelaySubscription.Unsub()
}
svc.subscriptionsMutex.Unlock()

if (!exists && !subscription.Open) {
if (!subscription.Open) {
return errors.New(SUBSCRIPTION_ALREADY_CLOSED)
}

Expand Down Expand Up @@ -739,7 +731,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
continue
}

sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
relaySubscription, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
if err != nil {
// TODO: notify user about subscription failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
Expand All @@ -751,9 +743,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
continue
}

svc.subscriptionsMutex.Lock()
svc.subscriptions[subscription.Uuid] = sub
svc.subscriptionsMutex.Unlock()
subscription.RelaySubscription = relaySubscription

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
Expand Down Expand Up @@ -791,10 +781,8 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subscription) {
walletPubkey, clientPubkey := getPubkeys(subscription)

svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
err := sub.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
relaySubscription := subscription.RelaySubscription
err := relaySubscription.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
Expand All @@ -804,7 +792,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
"client_pubkey": clientPubkey,
}).Error("Failed to publish to relay")
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED
sub.Unsub()
relaySubscription.Unsub()
} else {
svc.Logger.WithFields(logrus.Fields{
"request_event_id": subscription.RequestEvent.NostrId,
Expand Down Expand Up @@ -860,15 +848,13 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs
}

func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error {
svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
relaySubscription := subscription.RelaySubscription

go func(){
// block till EOS is received for nip 47 handlers
// only if request event is not yet published
if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) {
<-sub.EndOfStoredEvents
<-relaySubscription.EndOfStoredEvents
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
Expand All @@ -878,7 +864,7 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}

// loop through incoming events
for event := range sub.Events {
for event := range relaySubscription.Events {
go handleEvent(event, subscription)
}

Expand All @@ -889,11 +875,11 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}()

select {
case <-sub.Relay.Context().Done():
return sub.Relay.ConnectionError
case <-relaySubscription.Relay.Context().Done():
return relaySubscription.Relay.ConnectionError
case <-ctx.Done():
return nil
case <-sub.Context.Done():
case <-relaySubscription.Context.Done():
return nil
}
}
Expand Down

0 comments on commit 192ad37

Please sign in to comment.