Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

WIP: PostgreSQL LISTEN/NOTIFY: invalidate wit cache #2172

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ postgres.connection.maxidle: -1
postgres.connection.maxopen: -1
# Timeout for a transaction in minutes
postgres.transaction.timeout: 5m
# For LISTEN/NOTIFY connections
postgres.listennotify.minreconnectinterval: 10s
postgres.listennotify.maxreconnectinterval: 5m

#------------------------
# HTTP configuration
Expand Down
20 changes: 20 additions & 0 deletions configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ const (
varHeaderMaxLength = "header.maxlength"
varEnvironment = "environment"

// Postgres LISTEN/NOTIFY
varPostgresListenNotifyMinReconnectionInterval = "postgres.listennotify.minreconnectinterval"
varPostgresListenNotifyMaxReconnectionInterval = "postgres.listennotify.maxreconnectinterval"

// cache control settings for a list of resources
varCacheControlWorkItems = "cachecontrol.workitems"
varCacheControlWorkItemEvents = "cachecontrol.workitemevents"
Expand Down Expand Up @@ -180,6 +184,8 @@ func (c *Registry) setConfigDefaults() {
c.v.SetDefault(varPostgresConnectionTimeout, 5)
c.v.SetDefault(varPostgresConnectionMaxIdle, -1)
c.v.SetDefault(varPostgresConnectionMaxOpen, -1)
c.v.SetDefault(varPostgresListenNotifyMinReconnectionInterval, time.Duration(10*time.Second))
c.v.SetDefault(varPostgresListenNotifyMaxReconnectionInterval, time.Duration(5*time.Minute))

// Number of seconds to wait before trying to connect again
c.v.SetDefault(varPostgresConnectionRetrySleep, time.Duration(time.Second))
Expand Down Expand Up @@ -335,6 +341,20 @@ func (c *Registry) GetPostgresConfigString() string {
)
}

// GetPostgresListenNotifyMinReconnectInterval controls the duration to wait
// before trying to re-establish the database connection after connection loss.
// After each consecutive failure this interval is doubled, until
// GetPostgresListenNotifyMaxReconnectInterval is reached.
func (c *Registry) GetPostgresListenNotifyMinReconnectInterval() time.Duration {
return c.v.GetDuration(varPostgresListenNotifyMinReconnectionInterval)
}

// GetPostgresListenNotifyMaxReconnectInterval see
// GetPostgresListenNotifyMinReconnectInterval.
func (c *Registry) GetPostgresListenNotifyMaxReconnectInterval() time.Duration {
return c.v.GetDuration(varPostgresListenNotifyMaxReconnectionInterval)
}

// GetPopulateCommonTypes returns true if the (as set via default, config file, or environment variable)
// the common work item types such as bug or feature shall be created.
func (c *Registry) GetPopulateCommonTypes() bool {
Expand Down
80 changes: 80 additions & 0 deletions gormsupport/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package gormsupport

import (
"time"

"github.com/fabric8-services/fabric8-wit/configuration"
"github.com/fabric8-services/fabric8-wit/log"
"github.com/lib/pq"
errs "github.com/pkg/errors"
)

const (
// ChanSpaceTemplateUpdates is the name for the postgres notification
// channel on which subscribers are informed about updates to the space
// templates (e.g. when a migration has happened).
ChanSpaceTemplateUpdates = "f8_space_template_updates"
)

// A SubscriberFunc describes the function signature that a subscriber needs to
// have. The channel parameter is just an arbitrary identifier string the
// identities a channel. The extra parameter is can contain optional data that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra parameter is can contain

// was sent along with the notification.
type SubscriberFunc func(channel, extra string)

// SetupDatabaseListener sets up a Postgres LISTEN/NOTIFY connection and listens
// on events that we have subscribers for.
func SetupDatabaseListener(config configuration.Registry, subscribers map[string]SubscriberFunc) error {
if len(subscribers) == 0 {
return nil
}

dbConnectCallback := func(ev pq.ListenerEventType, err error) {
switch ev {
case pq.ListenerEventConnected:
log.Logger().Infof("database connection for LISTEN/NOTIFY established successfully")
case pq.ListenerEventDisconnected:
log.Logger().Errorf("lost LISTEN/NOTIFY database connection: %+v", err)
case pq.ListenerEventReconnected:
log.Logger().Infof("database connection for LISTEN/NOTIFY re-established successfully")
case pq.ListenerEventConnectionAttemptFailed:
log.Logger().Errorf("failed to connect to database for LISTEN/NOTIFY: %+v", err)
}
}

listener := pq.NewListener(config.GetPostgresConfigString(), config.GetPostgresListenNotifyMinReconnectInterval(), config.GetPostgresListenNotifyMaxReconnectInterval(), dbConnectCallback)

// listen on every subscribed channel
for channel := range subscribers {
err := listener.Listen(channel)
if err != nil {
log.Logger().Errorf("unable to open connection to database for LISTEN/NOTIFY %v", err)
return errs.Wrapf(err, "failed listen to postgres channel \"%s\"", channel)
}
}

// asynchronously handle notifications
go func() {
for {
select {
case n := <-listener.Notify:
sub, ok := subscribers[n.Channel]
if ok {
log.Logger().Debugf("received notification from postgres channel \"%s\": %s", n.Channel, n.Extra)
sub(n.Channel, n.Extra)
}
case <-time.After(90 * time.Second):
log.Logger().Infof("received no events for 90 seconds, checking connection")
go func() {
err := listener.Ping()
if err != nil {
log.Panic(nil, map[string]interface{}{
"err": err,
}, "failed to ping for LISTEN/NOTIFY database connection")
}
}()
}
}
}()
return nil
}
62 changes: 62 additions & 0 deletions gormsupport/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package gormsupport_test

import (
"sync"
"testing"

"github.com/fabric8-services/fabric8-wit/gormsupport"
"github.com/fabric8-services/fabric8-wit/gormtestsupport"
"github.com/fabric8-services/fabric8-wit/migration"
"github.com/fabric8-services/fabric8-wit/resource"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type TestListenerSuite struct {
gormtestsupport.DBTestSuite
}

func TestListener(t *testing.T) {
resource.Require(t, resource.Database)
suite.Run(t, &TestListenerSuite{DBTestSuite: gormtestsupport.NewDBTestSuite()})
}

func (s *TestListenerSuite) TestSetupDatabaseListener() {
s.T().Run("setup listener", func(t *testing.T) {
// given
channelName := "f8_custom_event_channel"
payload := "some additional info about the event"
wg := sync.WaitGroup{}
wg.Add(2)

gormsupport.SetupDatabaseListener(*s.Configuration, map[string]gormsupport.SubscriberFunc{
// This is the channel we send to from this test
channelName: func(channel, extra string) {
t.Logf("received notification on channel %s: %s", channel, extra)
require.Equal(t, channelName, channel)
require.Equal(t, payload, extra)
wg.Done()
},
// This is the channel that we send to from
// migration.PopulateCommonTypes() which is called by
// gormtestsupport.DBTestSuite internally.
gormsupport.ChanSpaceTemplateUpdates: func(channel, extra string) {
t.Logf("received notification on channel %s: %s", channel, extra)
require.Equal(t, gormsupport.ChanSpaceTemplateUpdates, channel)
require.Equal(t, "", extra)
wg.Done()
},
})

// Send a notification from a completely different connection than the
// one we established to listen to channels.
s.DB.Debug().Exec("SELECT pg_notify($1, $2)", channelName, payload)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mental note for myself: I need to check for db.Error here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in b227b9f


// This will send a notification on the
// gormsupport.ChanSpaceTemplateUpdates channel
migration.PopulateCommonTypes(nil, s.DB)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mental note for myself: I need to check for err here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in b227b9f


// wait until notification was received
wg.Wait()
})
}
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"runtime"
"time"

"github.com/fabric8-services/fabric8-wit/gormsupport"
"github.com/fabric8-services/fabric8-wit/workitem"

"github.com/fabric8-services/fabric8-wit/closeable"

"github.com/fabric8-services/fabric8-wit/account"
Expand Down Expand Up @@ -140,6 +143,13 @@ func main() {
os.Exit(0)
}

// Ensure we delete the work item cache when we receive a notification from postgres
gormsupport.SetupDatabaseListener(*config, map[string]gormsupport.SubscriberFunc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when SetupDatabaseListener function return an error? The error value is not handled here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I will have a look when this PR becomes more relevant.

gormsupport.ChanSpaceTemplateUpdates: func(channel, extra string) {
workitem.ClearGlobalWorkItemTypeCache()
},
})

// Make sure the database is populated with the correct types (e.g. bug etc.)
if config.GetPopulateCommonTypes() {
ctx := migration.NewMigrationContext(context.Background())
Expand Down
12 changes: 12 additions & 0 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"text/template"

"github.com/fabric8-services/fabric8-wit/errors"
"github.com/fabric8-services/fabric8-wit/gormsupport"
"github.com/fabric8-services/fabric8-wit/log"
"github.com/fabric8-services/fabric8-wit/ptr"
"github.com/fabric8-services/fabric8-wit/space"
Expand Down Expand Up @@ -674,5 +675,16 @@ func PopulateCommonTypes(ctx context.Context, db *gorm.DB) error {
log.Debug(ctx, nil, `imported space template #%d "%s"`, idx, t.Template.Name)
}
workitem.ClearGlobalWorkItemTypeCache() // Clear the WIT cache after updating existing WITs

// Ensure the WIT cache is cleared in all pods
db = db.Exec("SELECT pg_notify($1, '')", gormsupport.ChanSpaceTemplateUpdates)
if db.Error != nil {
log.Error(ctx, map[string]interface{}{
"err": db.Error,
"channel": gormsupport.ChanSpaceTemplateUpdates,
}, `failed to notify postgres event subscribers about template updates`)
return errs.Wrapf(db.Error, `failed to notify postgres event subscribers about template updates`)
}

return nil
}