Aelve Codesearch

grep over package repositories
influxdb-1.7.1
src/Database/InfluxDB/Query.hs
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ViewPatterns #-}
module Database.InfluxDB.Query
  (
  -- * Query interface
    Query
  , query
  , queryChunked

  -- * Query parameters
  , QueryParams
  , queryParams
  , server
  , database
  , precision
  , manager

  -- * Parsing results
  , QueryResults(..)
  , parseResultsWith

  -- * Low-level functions
  , withQueryResponse

  -- * Re-exports from tagged
  , Tagged(..)
  , untag
  ) where
import Control.Exception
import Control.Monad
import Data.Char
import Data.List
import Data.Proxy
import GHC.TypeLits

import Control.Lens
import Data.Aeson
import Data.Optional (Optional(..), optional)
import Data.Tagged
import Data.Vector (Vector)
import Data.Void
import qualified Control.Foldl as L
import qualified Data.Aeson.Parser as A
import qualified Data.Aeson.Types as A
import qualified Data.Attoparsec.ByteString as AB
import qualified Data.ByteString as B
import qualified Data.ByteString.Builder as BB
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text.Encoding as TE
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified Network.HTTP.Client as HC
import qualified Network.HTTP.Types as HT

import Database.InfluxDB.JSON
import Database.InfluxDB.Types as Types
import qualified Database.InfluxDB.Format as F

-- $setup
-- >>> :set -XOverloadedStrings
-- >>> :set -XRecordWildCards
-- >>> import Data.Time (UTCTime)

-- | Types that can be converted from an JSON object returned by InfluxDB.
--
-- For example the @h2o_feet@ series in
-- [the official document](https://docs.influxdata.com/influxdb/v1.2/query_language/data_exploration/)
-- can be encoded as follows:
--
-- >>> :{
-- data H2OFeet = H2OFeet
--   { time :: UTCTime
--   , levelDesc :: T.Text
--   , location :: T.Text
--   , waterLevel :: Double
--   }
-- instance QueryResults H2OFeet where
--   parseResults prec = parseResultsWith $ \_ _ columns fields -> do
--     time <- getField "time" columns fields >>= parseUTCTime prec
--     levelDesc <- getField "level_description" columns fields >>= parseJSON
--     location <- getField "location" columns fields >>= parseJSON
--     waterLevel <- getField "water_level" columns fields >>= parseJSON
--     return H2OFeet {..}
-- :}
class QueryResults a where
  -- | Parse a JSON object as an array of values of expected type.
  parseResults
    :: Precision 'QueryRequest
    -> Value
    -> A.Parser (Vector a)

instance QueryResults Void where
  parseResults _ = A.withObject "error" $ \obj -> obj .:? "error"
    >>= maybe (pure V.empty) (withText "error" $ fail . T.unpack)

fieldName :: KnownSymbol k => proxy k -> T.Text
fieldName = T.pack . symbolVal

-- | One-off type for non-timestamped measurements
--
-- >>> let p = queryParams "_internal"
-- >>> dbs <- query p "SHOW DATABASES" :: IO (V.Vector (Tagged "name" T.Text))
-- >>> find ((== "_internal") . untag) dbs
-- Just (Tagged "_internal")
instance (KnownSymbol k, FromJSON v) => QueryResults (Tagged k v) where
  parseResults _ = parseResultsWith $ \_ _ columns fields ->
    getField (fieldName (Proxy :: Proxy k)) columns fields >>= parseJSON

-- | One-off tuple for sigle-field measurements
instance
  ( KnownSymbol k1, FromJSON v1
  , KnownSymbol k2, FromJSON v2 )
  => QueryResults (Tagged k1 v1, Tagged k2 v2) where
    parseResults _ = parseResultsWith $ \_ _ columns fields -> do
      v1 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k1)) columns fields
      v2 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k2)) columns fields
      return (v1, v2)

-- | One-off tuple for two-field measurements
instance
  ( KnownSymbol k1, FromJSON v1
  , KnownSymbol k2, FromJSON v2
  , KnownSymbol k3, FromJSON v3 )
  => QueryResults (Tagged k1 v1, Tagged k2 v2, Tagged k3 v3) where
    parseResults _ = parseResultsWith $ \_ _ columns fields -> do
      v1 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k1)) columns fields
      v2 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k2)) columns fields
      v3 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k3)) columns fields
      return (v1, v2, v3)

-- | One-off tuple for three-field measurements
instance
  ( KnownSymbol k1, FromJSON v1
  , KnownSymbol k2, FromJSON v2
  , KnownSymbol k3, FromJSON v3
  , KnownSymbol k4, FromJSON v4 )
  => QueryResults (Tagged k1 v1, Tagged k2 v2, Tagged k3 v3, Tagged k4 v4) where
    parseResults _ = parseResultsWith $ \_ _ columns fields -> do
      v1 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k1)) columns fields
      v2 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k2)) columns fields
      v3 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k3)) columns fields
      v4 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k4)) columns fields
      return (v1, v2, v3, v4)

-- | One-off tuple for four-field measurements
instance
  ( KnownSymbol k1, FromJSON v1
  , KnownSymbol k2, FromJSON v2
  , KnownSymbol k3, FromJSON v3
  , KnownSymbol k4, FromJSON v4
  , KnownSymbol k5, FromJSON v5 )
  => QueryResults
    ( Tagged k1 v1, Tagged k2 v2, Tagged k3 v3, Tagged k4 v4
    , Tagged k5 v5
    ) where
    parseResults _ = parseResultsWith $ \_ _ columns fields -> do
      v1 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k1)) columns fields
      v2 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k2)) columns fields
      v3 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k3)) columns fields
      v4 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k4)) columns fields
      v5 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k5)) columns fields
      return (v1, v2, v3, v4, v5)

-- | One-off tuple for five-field measurements
instance
  ( KnownSymbol k1, FromJSON v1
  , KnownSymbol k2, FromJSON v2
  , KnownSymbol k3, FromJSON v3
  , KnownSymbol k4, FromJSON v4
  , KnownSymbol k5, FromJSON v5
  , KnownSymbol k6, FromJSON v6 )
  => QueryResults
    ( Tagged k1 v1, Tagged k2 v2, Tagged k3 v3, Tagged k4 v4
    , Tagged k5 v5, Tagged k6 v6
    ) where
    parseResults _ = parseResultsWith $ \_ _ columns fields -> do
      v1 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k1)) columns fields
      v2 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k2)) columns fields
      v3 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k3)) columns fields
      v4 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k4)) columns fields
      v5 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k5)) columns fields
      v6 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k6)) columns fields
      return (v1, v2, v3, v4, v5, v6)

-- | One-off tuple for six-field measurement
instance
  ( KnownSymbol k1, FromJSON v1
  , KnownSymbol k2, FromJSON v2
  , KnownSymbol k3, FromJSON v3
  , KnownSymbol k4, FromJSON v4
  , KnownSymbol k5, FromJSON v5
  , KnownSymbol k6, FromJSON v6
  , KnownSymbol k7, FromJSON v7 )
  => QueryResults
    ( Tagged k1 v1, Tagged k2 v2, Tagged k3 v3, Tagged k4 v4
    , Tagged k5 v5, Tagged k6 v6, Tagged k7 v7
    ) where
    parseResults _ = parseResultsWith $ \_ _ columns fields -> do
      v1 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k1)) columns fields
      v2 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k2)) columns fields
      v3 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k3)) columns fields
      v4 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k4)) columns fields
      v5 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k5)) columns fields
      v6 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k6)) columns fields
      v7 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k7)) columns fields
      return (v1, v2, v3, v4, v5, v6, v7)

-- | One-off tuple for seven-field measurements
instance
  ( KnownSymbol k1, FromJSON v1
  , KnownSymbol k2, FromJSON v2
  , KnownSymbol k3, FromJSON v3
  , KnownSymbol k4, FromJSON v4
  , KnownSymbol k5, FromJSON v5
  , KnownSymbol k6, FromJSON v6
  , KnownSymbol k7, FromJSON v7
  , KnownSymbol k8, FromJSON v8 )
  => QueryResults
    ( Tagged k1 v1, Tagged k2 v2, Tagged k3 v3, Tagged k4 v4
    , Tagged k5 v5, Tagged k6 v6, Tagged k7 v7, Tagged k8 v8
    ) where
    parseResults _ = parseResultsWith $ \_ _ columns fields -> do
      v1 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k1)) columns fields
      v2 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k2)) columns fields
      v3 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k3)) columns fields
      v4 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k4)) columns fields
      v5 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k5)) columns fields
      v6 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k6)) columns fields
      v7 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k7)) columns fields
      v8 <- parseJSON
        =<< getField (fieldName (Proxy :: Proxy k8)) columns fields
      return (v1, v2, v3, v4, v5, v6, v7, v8)

-- | The full set of parameters for the query API
--
-- Following lenses are available to access its fields:
--
-- * 'server'
-- * 'database'
-- * 'precision'
-- * 'authentication'
-- * 'manager'
data QueryParams = QueryParams
  { queryServer :: !Server
  , queryDatabase :: !Database
  , queryPrecision :: !(Precision 'QueryRequest)
  -- ^ Timestamp precision
  --
  -- InfluxDB uses nanosecond precision if nothing is specified.
  , queryAuthentication :: !(Maybe Credentials)
  -- ^ No authentication by default
  , queryManager :: !(Either HC.ManagerSettings HC.Manager)
  -- ^ HTTP connection manager
  }

-- | Smart constructor for 'QueryParams'
--
-- Default parameters:
--
--   ['server'] 'defaultServer'
--   ['precision'] 'RFC3339'
--   ['authentication'] 'Nothing'
--   ['manager'] @'Left' 'HC.defaultManagerSettings'@
queryParams :: Database -> QueryParams
queryParams queryDatabase = QueryParams
  { queryServer = defaultServer
  , queryPrecision = RFC3339
  , queryAuthentication = Nothing
  , queryManager = Left HC.defaultManagerSettings
  , ..
  }

-- | Query data from InfluxDB.
--
-- It may throw 'InfluxException'.
--
-- If you need a lower-level interface (e.g. to bypass the 'QueryResults'
-- constraint etc), see 'withQueryResponse'.
query :: QueryResults a => QueryParams -> Query -> IO (Vector a)
query params q = withQueryResponse params Nothing q go
  where
    go request response = do
      chunks <- HC.brConsume $ HC.responseBody response
      let body = BL.fromChunks chunks
      case eitherDecode' body of
        Left message -> throwIO $ UnexpectedResponse message request body
        Right val -> case A.parse (parseResults (queryPrecision params)) val of
          A.Success vec -> return vec
          A.Error message -> errorQuery message request response val

setPrecision
  :: Precision 'QueryRequest
  -> [(B.ByteString, Maybe B.ByteString)]
  -> [(B.ByteString, Maybe B.ByteString)]
setPrecision prec qs = maybe qs (\p -> ("epoch", Just p):qs) $
  precisionParam prec

precisionParam :: Precision 'QueryRequest -> Maybe B.ByteString
precisionParam = \case
  Nanosecond -> return "ns"
  Microsecond -> return "u"
  Millisecond -> return "ms"
  Second -> return "s"
  Minute -> return "m"
  Hour -> return "h"
  RFC3339 -> Nothing

-- | Same as 'query' but it instructs InfluxDB to stream chunked responses
-- rather than returning a huge JSON object. This can be lot more efficient than
-- 'query' if the result is huge.
--
-- It may throw 'InfluxException'.
--
-- If you need a lower-level interface (e.g. to bypass the 'QueryResults'
-- constraint etc), see 'withQueryResponse'.
queryChunked
  :: QueryResults a
  => QueryParams
  -> Optional Int
  -- ^ Chunk size
  --
  -- By 'Default', InfluxDB chunks responses by series or by every 10,000
  -- points, whichever occurs first. If it set to a 'Specific' value, InfluxDB
  -- chunks responses by series or by that number of points.
  -> Query
  -> L.FoldM IO (Vector a) r
  -> IO r
queryChunked params chunkSize q (L.FoldM step initialize extract) =
  withQueryResponse params (Just chunkSize) q go
  where
    go request response = do
      x0 <- initialize
      chunk0 <- HC.responseBody response
      x <- loop x0 k0 chunk0
      extract x
      where
        k0 = AB.parse A.json
        loop x k chunk
          | B.null chunk = return x
          | otherwise = case k chunk of
            AB.Fail unconsumed _contexts message ->
              throwIO $ UnexpectedResponse message request $
                BL.fromStrict unconsumed
            AB.Partial k' -> do
              chunk' <- HC.responseBody response
              loop x k' chunk'
            AB.Done leftover val ->
              case A.parse (parseResults (queryPrecision params)) val of
                A.Success vec -> do
                  x' <- step x vec
                  loop x' k0 leftover
                A.Error message -> errorQuery message request response val

-- | Lower-level interface to query data.
withQueryResponse
  :: QueryParams
  -> Maybe (Optional Int)
  -- ^ Chunk size
  --
  -- By 'Nothing', InfluxDB returns all matching data points at once.
  -- By @'Just' 'Default'@, InfluxDB chunks responses by series or by every
  -- 10,000 points, whichever occurs first. If it set to a 'Specific' value,
  -- InfluxDB chunks responses by series or by that number of points.
  -> Query
  -> (HC.Request -> HC.Response HC.BodyReader -> IO r)
  -> IO r
withQueryResponse params chunkSize q f = do
    manager' <- either HC.newManager return $ queryManager params
    HC.withResponse request manager' (f request)
      `catch` (throwIO . HTTPException)
  where
    request =
      HC.setQueryString (setPrecision (queryPrecision params) queryString) $
        queryRequest params
    queryString = addChunkedParam
      [ ("q", Just $ F.fromQuery q)
      , ("db", Just db)
      ]
      where
        !db = TE.encodeUtf8 $ databaseName $ queryDatabase params
    addChunkedParam ps = case chunkSize of
      Nothing -> ps
      Just size ->
        let !chunked = optional "true" (decodeChunkSize . max 1) size
        in ("chunked", Just chunked) : ps
      where
        decodeChunkSize = BL.toStrict . BB.toLazyByteString . BB.intDec


queryRequest :: QueryParams -> HC.Request
queryRequest QueryParams {..} = applyBasicAuth $ HC.defaultRequest
  { HC.host = TE.encodeUtf8 _host
  , HC.port = fromIntegral _port
  , HC.secure = _ssl
  , HC.method = "GET"
  , HC.path = "/query"
  }
  where
    Server {..} = queryServer
    applyBasicAuth =
      case queryAuthentication of
        Nothing -> id
        Just Credentials {..} ->
          HC.applyBasicAuth (TE.encodeUtf8 _user) (TE.encodeUtf8 _password)

errorQuery :: String -> HC.Request -> HC.Response body -> A.Value -> IO a
errorQuery message request response val = do
  let status = HC.responseStatus response
  when (HT.statusIsServerError status) $
    throwIO $ ServerError message
  when (HT.statusIsClientError status) $
    throwIO $ ClientError message request
  throwIO $ UnexpectedResponse
    ("BUG: " ++ message ++ " in Database.InfluxDB.Query.query")
    request
    (encode val)

makeLensesWith
  ( lensRules
    & lensField .~ mappingNamer
      (\name -> case stripPrefix "query" name of
        Just (c:cs) -> ['_':toLower c:cs]
        _ -> [])
    )
  ''QueryParams

-- |
-- >>> let p = queryParams "foo"
-- >>> p ^. server.host
-- "localhost"
instance HasServer QueryParams where
  server = _server

-- |
-- >>> let p = queryParams "foo"
-- >>> p ^. database
-- "foo"
instance HasDatabase QueryParams where
  database = _database

-- | Returning JSON responses contain timestamps in the specified
-- precision/format.
--
-- >>> let p = queryParams "foo"
-- >>> p ^. precision
-- RFC3339
instance HasPrecision 'QueryRequest QueryParams where
  precision = _precision

-- |
-- >>> let p = queryParams "foo" & manager .~ Left HC.defaultManagerSettings
instance HasManager QueryParams where
  manager = _manager

-- | Authentication info for the query
--
-- >>> let p = queryParams "foo"
-- >>> p ^. authentication
-- Nothing
-- >>> let p' = p & authentication ?~ credentials "john" "passw0rd"
-- >>> p' ^. authentication.traverse.user
-- "john"
instance HasCredentials QueryParams where
  authentication = _authentication