Skip to content

Commit

Permalink
otel: add opentelemetry traces
Browse files Browse the repository at this point in the history
  • Loading branch information
develop7 committed Dec 5, 2024
1 parent da0f48e commit 6afc7cd
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 61 deletions.
2 changes: 1 addition & 1 deletion cabal.project.freeze
Original file line number Diff line number Diff line change
@@ -1 +1 @@
index-state: hackage.haskell.org 2024-05-17T23:41:49Z
index-state: hackage.haskell.org 2024-11-25T13:43:26Z
65 changes: 65 additions & 0 deletions nix/overlays/haskell-packages.nix
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,71 @@ let
(prev.postgresql-libpq_0_10_1_0.override {
postgresql = super.libpq;
});

hs-opentelemetry-sdk = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-sdk";
ver = "0.1.0.0";
sha256 = "sha256-kg6iYyEW2a/qb7FFXbph/xKPFW/6Wqhl5P9NZotgbVs=";
}
{ });
hs-opentelemetry-propagator-datadog = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-propagator-datadog";
ver = "0.0.1.0";
sha256 = "sha256-V2FOsdyrR3X44FILTRpDIDNghc5vPIDx7z0CUGyJXQk=";
}
{ });
hs-opentelemetry-exporter-otlp = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-exporter-otlp";
ver = "0.1.0.0";
sha256 = "sha256-Y0ihGMDIu3GcN7wkjnth4z72WfyBUmqSNrJEvoGxi6M=";
}
{ });
hs-opentelemetry-instrumentation-wai = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-instrumentation-wai";
ver = "0.1.1.0";
sha256 = "sha256-9jz06jEOAfuDtk7RS7cntCDPmORukeS7hHYP04vxGXA=";
}
{ });
hs-opentelemetry-api = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-api";
ver = "0.2.0.0";
sha256 = "sha256-IgyI6J9ZiN9x0A/Jdp9fsdhJTqX3AJyTLlKmk8hFsTk=";
}
{ });
hs-opentelemetry-propagator-b3 = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-propagator-b3";
ver = "0.0.1.2";
sha256 = "sha256-hUk4f/xngG5NujSJGGb7lWawNE6EAbvw/8krKsGGsPY=";
}
{ });
hs-opentelemetry-propagator-w3c = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-propagator-w3c";
ver = "0.0.1.4";
sha256 = "sha256-Rq+bcerTD4Pqzr1sznvoOtkKanlV+0Blq3EXXN2HQuU=";
}
{ });
hs-opentelemetry-otlp = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-otlp";
ver = "0.1.0.0";
sha256 = "sha256-xZFIlyx2BKnwo6XCblCZTukNsjv/uG5T3u8uKlKJ1yc=";
}
{ });
hs-opentelemetry-utils-exceptions = lib.dontCheck (prev.callHackageDirect
{
pkg = "hs-opentelemetry-utils-exceptions";
ver = "0.2.0.1";
sha256 = "sha256-gukjbleRa4PKWcyBXC1J0kSQzohF5Or+ayvp5wxrzT0=";
}
{ });

};
in
{
Expand Down
5 changes: 5 additions & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ library
PostgREST.Query.QueryBuilder
PostgREST.Query.SqlFragment
PostgREST.Query.Statements
PostgREST.OpenTelemetry
PostgREST.Plan
PostgREST.Plan.CallPlan
PostgREST.Plan.MutatePlan
Expand Down Expand Up @@ -115,6 +116,9 @@ library
, hasql-transaction >= 1.0.1 && < 1.1
, heredoc >= 0.2 && < 0.3
, http-types >= 0.12.2 && < 0.13
, hs-opentelemetry-sdk >= 0.1.0.0 && < 0.2.0.0
, hs-opentelemetry-instrumentation-wai
, hs-opentelemetry-utils-exceptions
, insert-ordered-containers >= 0.2.2 && < 0.3
, iproute >= 1.7.0 && < 1.8
, jose-jwt >= 0.9.6 && < 0.11
Expand Down Expand Up @@ -262,6 +266,7 @@ test-suite spec
, hasql-pool >= 1.0.1 && < 1.1
, hasql-transaction >= 1.0.1 && < 1.1
, heredoc >= 0.2 && < 0.3
, hs-opentelemetry-sdk >= 0.1.0.0 && < 0.2.0.0
, hspec >= 2.3 && < 2.12
, hspec-wai >= 0.10 && < 0.12
, hspec-wai-json >= 0.10 && < 0.12
Expand Down
76 changes: 42 additions & 34 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,27 @@ import qualified PostgREST.Response as Response
import qualified PostgREST.Unix as Unix (installSignalHandlers)

import PostgREST.ApiRequest (ApiRequest (..))
import PostgREST.AppState (AppState)
import PostgREST.AppState (AppState, getOTelTracer)
import PostgREST.Auth (AuthResult (..))
import PostgREST.Config (AppConfig (..), LogLevel (..))
import PostgREST.Config.PgVersion (PgVersion (..))
import PostgREST.Error (Error)
import PostgREST.Error (Error (..))
import PostgREST.Network (resolveHost)
import PostgREST.Observation (Observation (..))
import PostgREST.Response.Performance (ServerTiming (..),
serverTimingHeader)
import PostgREST.SchemaCache (SchemaCache (..))
import PostgREST.Version (docsVersion, prettyVersion)

import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import Protolude hiding (Handler)
import System.TimeIt (timeItT)
import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import qualified Network.HTTP.Types as HTTP
import qualified Network.Socket as NS
import OpenTelemetry.Instrumentation.Wai (newOpenTelemetryWaiMiddleware)
import OpenTelemetry.Trace (defaultSpanArguments)
import OpenTelemetry.Utils.Exceptions (inSpanM)
import Protolude hiding (Handler)
import System.TimeIt (timeItT)

type Handler = ExceptT Error

Expand All @@ -84,7 +87,9 @@ run appState = do
host <- resolveHost $ AppState.getSocketREST appState
observer $ AppServerPortObs (fromJust host) port

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) app
oTelMWare <- newOpenTelemetryWaiMiddleware

Warp.runSettingsSocket (serverSettings conf) (AppState.getSocketREST appState) (oTelMWare app)

serverSettings :: AppConfig -> Warp.Settings
serverSettings AppConfig{..} =
Expand All @@ -102,27 +107,28 @@ postgrest logLevel appState connWorker =
Logger.middleware logLevel Auth.getRole $
-- fromJust can be used, because the auth middleware will **always** add
-- some AuthResult to the vault.
\req respond -> case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
maybeSchemaCache <- AppState.getSchemaCache appState
pgVer <- AppState.getPgVersion appState

let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse =
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
-- function can respond successfully (with a stale schema cache) before
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getNextDelay appState
return $ addRetryHint delay response
respond resp
\req respond -> inSpanM (getOTelTracer appState) "respond" defaultSpanArguments $
case fromJust $ Auth.getResult req of
Left err -> respond $ Error.errorResponseFor err
Right authResult -> do
appConf <- AppState.getConfig appState -- the config must be read again because it can reload
maybeSchemaCache <- AppState.getSchemaCache appState
pgVer <- AppState.getPgVersion appState

let
eitherResponse :: IO (Either Error Wai.Response)
eitherResponse = inSpanM (getOTelTracer appState) "eitherResponse" defaultSpanArguments $
runExceptT $ postgrestResponse appState appConf maybeSchemaCache pgVer authResult req

response <- either Error.errorResponseFor identity <$> eitherResponse
-- Launch the connWorker when the connection is down. The postgrest
-- function can respond successfully (with a stale schema cache) before
-- the connWorker is done.
when (isServiceUnavailable response) connWorker
resp <- do
delay <- AppState.getNextDelay appState
return $ addRetryHint delay response
respond resp

postgrestResponse
:: AppState.AppState
Expand All @@ -144,10 +150,10 @@ postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@

let jwtTime = if configServerTimingEnabled then Auth.getJwtDur req else Nothing

(parseTime, apiReq@ApiRequest{..}) <- withTiming $ liftEither . mapLeft Error.ApiRequestError $ ApiRequest.userApiRequest conf req body sCache
(planTime, plan) <- withTiming $ liftEither $ Plan.actionPlan iAction conf apiReq sCache
(queryTime, queryResult) <- withTiming $ Query.runQuery appState conf authResult apiReq plan sCache pgVer (Just authRole /= configDbAnonRole)
(respTime, resp) <- withTiming $ liftEither $ Response.actionResponse queryResult apiReq (T.decodeUtf8 prettyVersion, docsVersion) conf sCache iSchema iNegotiatedByProfile
(parseTime, apiReq@ApiRequest{..}) <- withOTel "parse" $ withTiming $ liftEither . mapLeft Error.ApiRequestError $ ApiRequest.userApiRequest conf req body sCache
(planTime, plan) <- withOTel "plan" $ withTiming $ liftEither $ Plan.actionPlan iAction conf apiReq sCache
(queryTime, queryResult) <- withOTel "query" $ withTiming $ Query.runQuery appState conf authResult apiReq plan sCache pgVer (Just authRole /= configDbAnonRole)
(respTime, resp) <- withOTel "response" $ withTiming $ liftEither $ Response.actionResponse queryResult apiReq (T.decodeUtf8 prettyVersion, docsVersion) conf sCache iSchema iNegotiatedByProfile

return $ toWaiResponse (ServerTiming jwtTime parseTime planTime queryTime respTime) resp

Expand All @@ -164,6 +170,8 @@ postgrestResponse appState conf@AppConfig{..} maybeSchemaCache pgVer authResult@
r <- f
pure (Nothing, r)

withOTel label = inSpanM (getOTelTracer appState) label defaultSpanArguments

traceHeaderMiddleware :: AppState -> Wai.Middleware
traceHeaderMiddleware appState app req respond = do
conf <- AppState.getConfig appState
Expand Down
18 changes: 13 additions & 5 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module PostgREST.AppState
, getJwtCache
, getSocketREST
, getSocketAdmin
, getOTelTracer
, init
, initSockets
, initWithPool
Expand Down Expand Up @@ -76,6 +77,7 @@ import PostgREST.Unix (createAndBindDomainSocket)

import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
import Data.String (IsString (..))
import OpenTelemetry.Trace (Tracer)
import Protolude

data AuthResult = AuthResult
Expand Down Expand Up @@ -116,6 +118,8 @@ data AppState = AppState
, stateObserver :: ObservationHandler
, stateLogger :: Logger.LoggerState
, stateMetrics :: Metrics.MetricsState
-- | OpenTelemetry tracer
, oTelTracer :: Tracer
}

-- | Schema cache status
Expand All @@ -126,8 +130,8 @@ data SchemaCacheStatus

type AppSockets = (NS.Socket, Maybe NS.Socket)

init :: AppConfig -> IO AppState
init conf@AppConfig{configLogLevel, configDbPoolSize} = do
init :: AppConfig -> Tracer -> IO AppState
init conf@AppConfig{configLogLevel, configDbPoolSize} tracer = do
loggerState <- Logger.init
metricsState <- Metrics.init configDbPoolSize
let observer = liftA2 (>>) (Logger.observationLogger loggerState configLogLevel) (Metrics.observationMetrics metricsState)
Expand All @@ -136,11 +140,11 @@ init conf@AppConfig{configLogLevel, configDbPoolSize} = do

pool <- initPool conf observer
(sock, adminSock) <- initSockets conf
state' <- initWithPool (sock, adminSock) pool conf loggerState metricsState observer
state' <- initWithPool (sock, adminSock) pool conf loggerState metricsState tracer observer
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}

initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> Tracer -> ObservationHandler -> IO AppState
initWithPool (sock, adminSock) pool conf loggerState metricsState tracer observer = do

appState <- AppState pool
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
Expand All @@ -159,6 +163,7 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
<*> pure observer
<*> pure loggerState
<*> pure metricsState
<*> pure tracer

deb <-
let decisecond = 100000 in
Expand Down Expand Up @@ -325,6 +330,9 @@ getSocketREST = stateSocketREST
getSocketAdmin :: AppState -> Maybe NS.Socket
getSocketAdmin = stateSocketAdmin

getOTelTracer :: AppState -> Tracer
getOTelTracer = oTelTracer

getMainThreadId :: AppState -> ThreadId
getMainThreadId = stateMainThreadId

Expand Down
16 changes: 8 additions & 8 deletions src/PostgREST/CLI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,29 @@ import qualified Options.Applicative as O

import Text.Heredoc (str)

import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.Observation (Observation (..))
import PostgREST.SchemaCache (querySchemaCache)
import PostgREST.Version (prettyVersion)
import PostgREST.AppState (AppState)
import PostgREST.Config (AppConfig (..))
import PostgREST.Observation (Observation (..))
import PostgREST.OpenTelemetry (withTracer)
import PostgREST.SchemaCache (querySchemaCache)
import PostgREST.Version (prettyVersion)

import qualified PostgREST.App as App
import qualified PostgREST.AppState as AppState
import qualified PostgREST.Config as Config

import Protolude


main :: CLI -> IO ()
main CLI{cliCommand, cliPath} = do
main CLI{cliCommand, cliPath} = withTracer "PostgREST" $ \tracer -> do
conf@AppConfig{..} <-
either panic identity <$> Config.readAppConfig mempty cliPath Nothing mempty mempty

-- Per https://github.com/PostgREST/postgrest/issues/268, we want to
-- explicitly close the connections to PostgreSQL on shutdown.
-- 'AppState.destroy' takes care of that.
bracket
(AppState.init conf)
(AppState.init conf tracer)
AppState.destroy
(\appState -> case cliCommand of
CmdDumpConfig -> do
Expand Down
22 changes: 22 additions & 0 deletions src/PostgREST/OpenTelemetry.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module PostgREST.OpenTelemetry (withTracer) where

import OpenTelemetry.Attributes (emptyAttributes)
import OpenTelemetry.Trace (InstrumentationLibrary (..), Tracer,
initializeGlobalTracerProvider,
makeTracer, shutdownTracerProvider,
tracerOptions)
import PostgREST.Version (prettyVersion)
import Protolude

withTracer :: Text -> (Tracer -> IO c) -> IO c
withTracer label f = bracket
initializeGlobalTracerProvider
shutdownTracerProvider
(\tracerProvider -> f $ makeTracer tracerProvider instrumentationLibrary tracerOptions)
where
instrumentationLibrary =
InstrumentationLibrary
{ libraryName = label
, libraryVersion = decodeUtf8 prettyVersion
, librarySchemaUrl = ""
, libraryAttributes = emptyAttributes}
13 changes: 13 additions & 0 deletions stack-21.7.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,16 @@ extra-deps:
- hasql-notifications-0.2.2.0
- hasql-pool-1.0.1
- postgresql-libpq-0.10.1.0

- hs-opentelemetry-sdk-0.1.0.0@sha256:2642851866f11a494c99f15202d4bd9e75d4a5e1a7f3f172742a0676a33c664f,4059
- hs-opentelemetry-api-0.2.0.0@sha256:bbdbe7e212e99f17a7e68d09b94c1a6613e50ce88b3cb1b68979bbb0221291ae,4051
- hs-opentelemetry-exporter-otlp-0.1.0.0@sha256:4c908a7e2e5053879687b7a7ee6e40a8eb22868e1a0808cd0cfd6ac9905057b8,1526
- hs-opentelemetry-instrumentation-wai-0.1.1.0@sha256:d97b4cb3870217e64e95da3f51db814eca62eb57484ee0a6f747366da5940bc2,1371
- hs-opentelemetry-propagator-b3-0.0.1.2@sha256:8815dd74f27a908b5be0729cc09a3bf9f3049481c982252bbd6c3f6b908ecfcd,1340
- hs-opentelemetry-propagator-datadog-0.0.1.0@sha256:c85de95e3c33b3ffcf980f560166e960cab0888e0741315f487288b3653c007c,2950
- hs-opentelemetry-propagator-w3c-0.0.1.4@sha256:251428754454fbaf71d9b6acbbea473014b1ab50bdcda8bc8fe1532e63193374,1382
- hs-opentelemetry-utils-exceptions-0.2.0.1@sha256:b32c3109b896dbab67c74c28e8ffcfe6e7f86aa29454fc6a31c06a671246e78b,1477
- hs-opentelemetry-otlp-0.1.0.0@sha256:5cd096b15f26f51ffae4c18f6a26794daef801acc9e13033db8b21a7606336d4,2533
- thread-utils-context-0.3.0.4@sha256:e763da1c6cab3b6d378fb670ca74aa9bf03c9b61b6fcf7628c56363fb0e3e71e,1671
- thread-utils-finalizers-0.1.1.0@sha256:24944b71d9f1d01695a5908b4a3b44838fab870883114a323336d537995e0a5b,1381
- unix-compat-0.7.3@sha256:b23220ab66f6ab2bedeec964152fef48c1f00f33dc911a59dde842d1d8fd2e05,3244
Loading

0 comments on commit 6afc7cd

Please sign in to comment.