{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoFieldSelectors #-} module MyLib (someFunc) where import Control.Arrow (left) import Control.Exception (Exception, throwIO) import Data.Aeson qualified as A import Data.Aeson.Key qualified as A import Data.Aeson.KeyMap qualified as KM import Data.ByteString.Char8 qualified as BS import Data.ByteString.Lazy (ByteString) import Data.Either import Data.Function ((&)) import Data.Functor ((<&>)) import Data.HashMap.Strict qualified as HM import Data.Int import Data.List qualified as L import Data.Scientific (Scientific) import Data.Text (Text) import Data.Text.Encoding qualified as T import Data.Vector (Vector) import Data.Vector qualified as V import Database.PostgreSQL.Simple (Only, connectPostgreSQL, execute, formatQuery, query) import Database.PostgreSQL.Simple.ToField (Action, ToField (..)) import Database.PostgreSQL.Simple.Types (Query (..)) import Database.PostgreSQL.Simple.Types qualified as P import Options.Applicative qualified as O import System.OsPath (OsPath, encodeUtf) data Options = Options {log :: OsPath} parseOptions :: O.Parser Options parseOptions = Options <$> O.option (O.eitherReader (\path -> show `left` encodeUtf path)) (O.long "log" <> O.short 'l') data UserException = UserException Text deriving (Show) instance Exception UserException data SchemaEntry = SchemaEntry { _type :: Maybe Text, mapper :: Maybe (Text -> Text) } data Schema = Schema { entries :: HM.HashMap A.Key SchemaEntry } schemaToQuery :: Text -> Schema -> Query schemaToQuery table schema = Query . BS.unlines $ [ "insert into " <> T.encodeUtf8 table <> "(" <> BS.intercalate ", " (map (T.encodeUtf8 . A.toText) keys) <> ")", "select\n " <> BS.intercalate ", \n " (replicate (length keys) "? as ?") ] where keys = HM.keys schema.entries toQueryValue :: A.Object -> [Action] toQueryValue obj = concatMap (\(k, v) -> [toField $ fromAeson v, toField . P.Identifier $ A.toText k]) (KM.toList obj) data PostgreSQLValue = Object !A.Object | Array !A.Array | String !Text | Number !Scientific | Bool !Bool | Null instance ToField PostgreSQLValue where toField :: PostgreSQLValue -> Action toField obj@(Object _) = toField obj toField arr@(Array _) = toField arr toField (String text) = toField text toField (Number number) = toField number toField (Bool bool) = toField bool toField Null = toField P.Null fromAeson :: A.Value -> PostgreSQLValue fromAeson (A.Object obj) = Object obj fromAeson (A.Array arr) = Array arr fromAeson (A.String text) = String text fromAeson (A.Number number) = Number number fromAeson (A.Bool bool) = Bool bool fromAeson A.Null = Null someFunc :: IO () someFunc = do let (logLine :: ByteString) = "{\"log\":\"Streaming: receipts -> 4283974 (limited: False, updates: 1, max token: 4283974)\",\"namespace\":\"synapse.replication.tcp.resource\",\"level\":\"INFO\",\"time\":1728503717.28,\"request\":\"replication_notifier-221090\",\"server_name\":\"matrix.redalder.org\"}" schema = Schema ( HM.fromList [ ( "log", SchemaEntry { _type = Nothing, mapper = Nothing } ), ( "namespace", SchemaEntry { _type = Nothing, mapper = Nothing } ), ( "time", SchemaEntry { _type = Nothing, mapper = Just (\x -> "to_timestamp(" <> x <> "::float)") } ), ( "level", SchemaEntry { _type = Just "synapse_log_level", mapper = Nothing } ), ( "request", SchemaEntry { _type = Nothing, mapper = Nothing } ), ( "server_name", SchemaEntry { _type = Nothing, mapper = Nothing } ) ] ) (logLine' :: A.Object) <- either (const $ throwIO (UserException "error")) pure (A.eitherDecode logLine) conn <- connectPostgreSQL "host='localhost' port='5432' user='main' password='password'" -- print logLine' -- print q let query = schemaToQuery "synapse_logs" schema putStrLn . BS.unpack =<< formatQuery conn query (toQueryValue logLine') print =<< execute conn query (toQueryValue logLine') pure ()