log-shipper able to generate simple query

Signed-off-by: magic_rb <magic_rb@redalder.org>
This commit is contained in:
magic_rb 2024-10-10 22:12:24 +02:00
parent 30bb0b21ef
commit 819370f987
No known key found for this signature in database
GPG key ID: 08D5287CC5DDCA0E
2 changed files with 44 additions and 17 deletions

View file

@ -18,6 +18,7 @@ import Data.Functor ((<&>))
import Data.HashMap.Strict qualified as HM
import Data.Int
import Data.List qualified as L
import Data.Scientific (Scientific)
import Data.Text (Text)
import Data.Text.Encoding qualified as T
import Data.Vector (Vector)
@ -25,6 +26,7 @@ import Data.Vector qualified as V
import Database.PostgreSQL.Simple (Only, connectPostgreSQL, execute, formatQuery, query)
import Database.PostgreSQL.Simple.ToField (Action, ToField (..))
import Database.PostgreSQL.Simple.Types (Query (..))
import Database.PostgreSQL.Simple.Types qualified as P
import Options.Applicative qualified as O
import System.OsPath (OsPath, encodeUtf)
@ -50,26 +52,44 @@ data Schema = Schema
{ entries :: HM.HashMap A.Key SchemaEntry
}
objectToQuery :: Text -> Schema -> Query
objectToQuery table schema =
schemaToQuery :: Text -> Schema -> Query
schemaToQuery table schema =
Query . BS.unlines $
[ "insert into " <> T.encodeUtf8 table <> "(" <> BS.intercalate ", " (map (T.encodeUtf8 . A.toText) keys) <> ")",
-- "select",
-- BS.intercalate ", \n" (map keyToSelect keys) <> "\n",
-- "from ? as item(data)"
"values (" <> BS.intercalate ", " (replicate (length keys) "?") <> ")"
"select\n " <> BS.intercalate ", \n " (replicate (length keys) "? as ?")
]
where
keyToSelect key = T.encodeUtf8 $ case entry.mapper of
Nothing -> access <> _type <> " as " <> key'
Just mapper -> mapper access <> _type <> " as " <> key'
where
access = "(" <> "data" <> " ->> '" <> key' <> "')"
_type = maybe "" ("::" <>) entry._type
key' = A.toText key
entry = schema.entries HM.! key
keys = HM.keys schema.entries
toQueryValue :: A.Object -> [Action]
toQueryValue obj =
concatMap (\(k, v) -> [toField $ fromAeson v, toField . P.Identifier $ A.toText k]) (KM.toList obj)
data PostgreSQLValue
= Object !A.Object
| Array !A.Array
| String !Text
| Number !Scientific
| Bool !Bool
| Null
instance ToField PostgreSQLValue where
toField :: PostgreSQLValue -> Action
toField obj@(Object _) = toField obj
toField arr@(Array _) = toField arr
toField (String text) = toField text
toField (Number number) = toField number
toField (Bool bool) = toField bool
toField Null = toField P.Null
fromAeson :: A.Value -> PostgreSQLValue
fromAeson (A.Object obj) = Object obj
fromAeson (A.Array arr) = Array arr
fromAeson (A.String text) = String text
fromAeson (A.Number number) = Number number
fromAeson (A.Bool bool) = Bool bool
fromAeson A.Null = Null
someFunc :: IO ()
someFunc = do
let (logLine :: ByteString) = "{\"log\":\"Streaming: receipts -> 4283974 (limited: False, updates: 1, max token: 4283974)\",\"namespace\":\"synapse.replication.tcp.resource\",\"level\":\"INFO\",\"time\":1728503717.28,\"request\":\"replication_notifier-221090\",\"server_name\":\"matrix.redalder.org\"}"
@ -105,6 +125,12 @@ someFunc = do
{ _type = Nothing,
mapper = Nothing
}
),
( "server_name",
SchemaEntry
{ _type = Nothing,
mapper = Nothing
}
)
]
)
@ -115,9 +141,9 @@ someFunc = do
-- print logLine'
-- print q
let query = objectToQuery "synapse_logs" schema
let query = schemaToQuery "synapse_logs" schema
putStrLn . BS.unpack =<< formatQuery conn query (take 5 $ KM.elems logLine')
print =<< execute conn query (take 5 $ KM.elems logLine')
putStrLn . BS.unpack =<< formatQuery conn query (toQueryValue logLine')
print =<< execute conn query (toQueryValue logLine')
pure ()

View file

@ -74,6 +74,7 @@ library
postgresql-simple,
filepath,
aeson,
scientific,
text,
bytestring,
vector,