sadfzxcv
#!/usr/bin/env python3
import os
import requests
# Dummy logger import
import logger # Provided by your environment
# Example Kdb+ REST endpoint
KDB_URL = "http://kdbserver:8080/execute"
def run_query(query, check_name):
"""
Send a Q query to Kdb+ via REST, returning True on success, False on failure.
We do NOT parse/store the response data because the API handles that.
"""
payload = {
"query": query,
"responseFormat": "json"
}
try:
logger.info(f"Sending query for check '{check_name}' to Kdb.")
resp = requests.post(KDB_URL, json=payload, timeout=30)
resp.raise_for_status() # Raises error if status != 200
logger.info(f"Check '{check_name}' succeeded. HTTP Status {resp.status_code}.")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Check '{check_name}' failed with error: {e}")
return False
def main():
"""
1) Looks in a specified directory (sql_checks_dir) for all .sql files.
2) Each file's name is treated as the "check" label (minus the .sql extension).
3) The content of each file is treated as the Q query to run.
4) We only log success/failure; the REST API handles storing data.
"""
logger.info("=== Starting FX Data Daily Checks ===")
# Directory where your .sql files live
sql_checks_dir = "./checks"
# If needed, make sure the directory exists
if not os.path.isdir(sql_checks_dir):
logger.error(f"SQL checks directory '{sql_checks_dir}' not found.")
return
# List all .sql files in that directory
sql_files = [f for f in os.listdir(sql_checks_dir) if f.endswith(".sql")]
if not sql_files:
logger.info(f"No .sql files found in '{sql_checks_dir}'. Nothing to run.")
return
for sql_file in sql_files:
# The check name is just the filename minus extension
check_name = os.path.splitext(sql_file)[0]
sql_path = os.path.join(sql_checks_dir, sql_file)
logger.info(f"Processing check file: {sql_path}")
# Read the Q query from the file
try:
with open(sql_path, "r", encoding="utf-8") as f:
query = f.read()
except Exception as e:
logger.error(f"Failed to read '{sql_path}': {e}")
continue
# Run the query
success = run_query(query, check_name)
if success:
logger.info(f"Check '{check_name}' completed successfully.")
else:
logger.error(f"Check '{check_name}' encountered an error.")
logger.info("=== End of FX Data Daily Checks ===")
if __name__ == "__main__":
main()
-- Missing_Values.sql
select from fxData
where null timestamps
or null symbol
or null source
or null date
or any each null bidSize
or any each null bidPrice
or any each null offerSize
or any each null offerPrice
-- Zero_Negative_Quotes.sql
select from fxData
where any each bidPrice=0.0
or any each offerPrice=0.0
or any each bidSize<=0.0
or any each offerSize<=0.0
-- Large_Rates.sql
select from fxData
where any each bidPrice>1000.0
or any each offerPrice>1000.0
-- Crazy_Timestamps.sql
select from fxData
where year timestamps < 1980
or year timestamps > 2100
-- Unexpected_Sources.sql
select from fxData
where not source in `Bloomberg`Reuters`MyFeed
-- Duplicates.sql
select duplicates: count i
by timestamps, symbol, source, bidPrice, offerPrice
from fxData
having count i > 1