Skip to content

Commit

Permalink
Extract important queries to postgres functions
Browse files Browse the repository at this point in the history
The motivating use case for extracting these functions is that it
gives more control to the developer using this package on how to
deploy this to their infrastructure.

For instance, some people may be interested in a different table
structure for `payloads`, like having it partitioned. With this
change, developers are free to change the implementation of the
postgres functions as they see fit as long as they implement an
equivalent logic and return the same types.
  • Loading branch information
lorenzo committed Jul 28, 2021
1 parent 49f4cba commit c4ff38e
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 65 deletions.
53 changes: 45 additions & 8 deletions hasql-queue.cabal
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.31.2.
-- This file has been generated from package.yaml by hpack version 0.34.4.
--
-- see: https://github.com/sol/hpack
--
-- hash: 956ae93525f9dafcc0c9c8149cd2bbc8cfcfe4e63310adec92ce40f995e4cbf4
-- hash: 30a78bb71c0fb6470ad0d6b6788b23f19801ab253d1c65e008a48e329e01b914

name: hasql-queue
version: 1.2.0.1
version: 1.3.0.0
synopsis: A PostgreSQL backed queue
description: A PostgreSQL backed queue. Please see README.md
category: Web
Expand All @@ -18,7 +18,8 @@ maintainer: [email protected]
copyright: 2020 Jonathan Fischoff
license: BSD3
license-file: LICENSE
tested-with: GHC ==8.8.1
tested-with:
GHC ==8.8.1
build-type: Simple
extra-source-files:
README.md
Expand All @@ -42,7 +43,16 @@ library
Paths_hasql_queue
hs-source-dirs:
src
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls
build-depends:
aeson
Expand All @@ -67,7 +77,16 @@ executable benchmark
Paths_hasql_queue
hs-source-dirs:
benchmarks
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N
build-depends:
aeson
Expand Down Expand Up @@ -98,7 +117,16 @@ executable hasql-queue-tmp-db
Paths_hasql_queue
hs-source-dirs:
hasql-queue-tmp-db
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N -g2
build-depends:
aeson
Expand Down Expand Up @@ -137,7 +165,16 @@ test-suite unit-tests
Paths_hasql_queue
hs-source-dirs:
test
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N
build-depends:
aeson
Expand Down
2 changes: 1 addition & 1 deletion package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: hasql-queue
version: '1.2.0.2'
version: '1.3.0.0'
synopsis: A PostgreSQL backed queue
description: A PostgreSQL backed queue. Please see README.md
category: Web
Expand Down
23 changes: 3 additions & 20 deletions src/Hasql/Queue/High/ExactlyOnce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,13 @@ dequeue valueDecoder count
| count <= 0 = pure []
| otherwise = do
let multipleQuery = [here|
DELETE FROM payloads
WHERE id in
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT $1
)
RETURNING value
SELECT value FROM dequeue_payload($1)
|]

multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4

singleQuery = [here|
DELETE FROM payloads
WHERE id =
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING value
SELECT value FROM dequeue_payload(1)
|]

singleEncoder = mempty
Expand Down
42 changes: 8 additions & 34 deletions src/Hasql/Queue/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ newtype PayloadId = PayloadId { unPayloadId :: Int64 }
data Payload a = Payload
{ pId :: PayloadId
, pState :: State
-- TODO do I need this?
, pAttempts :: Int
, pModifiedAt :: Int
-- TODO rename. I don't need this either.
, pValue :: a
} deriving (Show, Eq)

Expand All @@ -75,8 +71,6 @@ payloadDecoder thePayloadDecoder
= Payload
<$> payloadIdRow
<*> D.column (D.nonNullable stateDecoder)
<*> D.column (D.nonNullable $ fromIntegral <$> D.int4)
<*> D.column (D.nonNullable $ fromIntegral <$> D.int4)
<*> D.column (D.nonNullable thePayloadDecoder)

payloadIdEncoder :: E.Value PayloadId
Expand All @@ -92,9 +86,7 @@ payloadIdRow = D.column (D.nonNullable payloadIdDecoder)
enqueuePayload :: E.Value a -> [a] -> Session [PayloadId]
enqueuePayload theEncoder values = do
let theQuery = [here|
INSERT INTO payloads (attempts, value)
SELECT 0, * FROM unnest($1)
RETURNING id
SELECT id FROM enqueue_payload($1)
|]
encoder = E.param $ E.nonNullable $ E.foldableArray $ E.nonNullable theEncoder
decoder = D.rowList (D.column (D.nonNullable payloadIdDecoder))
Expand All @@ -105,30 +97,15 @@ enqueuePayload theEncoder values = do
dequeuePayload :: D.Value a -> Int -> Session [Payload a]
dequeuePayload valueDecoder count = do
let multipleQuery = [here|
DELETE FROM payloads
WHERE id in
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT $1
)
RETURNING id, state, attempts, modified_at, value
SELECT id, state, value
FROM dequeue_payload($1)
|]

multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4

singleQuery = [here|
DELETE FROM payloads
WHERE id =
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, state, attempts, modified_at, value
SELECT id, state, value
FROM dequeue_payload(1)
|]

singleEncoder = mempty
Expand All @@ -144,7 +121,7 @@ dequeuePayload valueDecoder count = do
getPayload :: D.Value a -> PayloadId -> Session (Maybe (Payload a))
getPayload decoder payloadId = do
let theQuery = [here|
SELECT id, state, attempts, modified_at, value
SELECT id, state, value
FROM payloads
WHERE id = $1
|]
Expand All @@ -168,10 +145,7 @@ getCount = do
incrementAttempts :: Int -> [PayloadId] -> Session ()
incrementAttempts retryCount pids = do
let theQuery = [here|
UPDATE payloads
SET state=CASE WHEN attempts >= $1 THEN 'failed' :: state_t ELSE 'enqueued' END
, attempts=attempts+1
WHERE id = ANY($2)
SELECT increment_payload_attempts($1, $2)
|]
encoder = (fst >$< E.param (E.nonNullable E.int4)) <>
(snd >$< E.param (E.nonNullable $ E.foldableArray $ E.nonNullable payloadIdEncoder))
Expand Down
64 changes: 64 additions & 0 deletions src/Hasql/Queue/Migrate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ migrationQueryString valueType = [i|
CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at)
WHERE (state = 'enqueued');

CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS
$$
WITH available AS
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT limit_
)
DELETE FROM payloads
USING available
WHERE payloads.id = available.id
RETURNING payloads.*
$$ LANGUAGE SQL VOLATILE;

CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS
$$
UPDATE payloads
SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END
, attempts=attempts+1
WHERE id = ANY(ids_)
$$ LANGUAGE SQL VOLATILE;

CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS
$$
INSERT INTO payloads (attempts, value)
SELECT 0, * FROM unnest(values_)
RETURNING *
$$ LANGUAGE SQL VOLATILE;
|]

{-| This function creates a table and enumeration type that is
Expand Down Expand Up @@ -106,6 +136,37 @@ migrationQueryString valueType = [i|
CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at, state)
WHERE (state = 'enqueued');
CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS
$$
WITH available AS
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT limit_
)
DELETE FROM payloads
USING available
WHERE payloads.id = available.id
RETURNING payloads.*
$$ LANGUAGE SQL VOLATILE;
CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS
$$
UPDATE payloads
SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END
, attempts=attempts+1
WHERE id = ANY(ids_)
$$ LANGUAGE SQL VOLATILE;
CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS
$$
INSERT INTO payloads (attempts, value)
SELECT 0, * FROM unnest(values_)
RETURNING *
$$ LANGUAGE SQL VOLATILE;
@
The @VALUE_TYPE@ needs to passed in through the second argument.
Expand All @@ -123,6 +184,9 @@ Drop everything created by 'migrate'
teardown :: Connection -> IO ()
teardown conn = do
let theQuery = [i|
DROP FUNCTION IF EXISTS enqueue_payload;
DROP FUNCTION IF EXISTS dequeue_payload;
DROP FUNCTION IF EXISTS increment_payload_attempts;
DROP TABLE IF EXISTS payloads;
DROP TYPE IF EXISTS state_t;
DROP SEQUENCE IF EXISTS modified_index;
Expand Down
4 changes: 2 additions & 2 deletions test/Hasql/Queue/Low/AtLeastOnceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "
let Just decoded = mapM (decode . encode) xs
sort decoded `shouldBe` sort expected

it "enqueue returns a PayloadId that cooresponds to the entry it added" $ withConnection $ \conn -> do
it "enqueue returns a PayloadId that corresponds to the entry it added" $ withConnection $ \conn -> do
[payloadId] <- I.runThrow (I.enqueuePayload E.int4 [1]) conn
Just actual <- getPayload conn D.int4 payloadId

pValue actual `shouldBe` 1
pId actual `shouldBe` payloadId

0 comments on commit c4ff38e

Please sign in to comment.