I am trying to pass a query with a PLC tag to DB2 on a loop with a function, but PLC tag path messes up the query in Python [closed]

1 day ago 1
ARTICLE AD BOX

I have a loop running with two components, one that runs on separate timers, the other that runs queries when Aveva PI tags have specific values, the rest of the logic is fine I believe, it's just a syntax error with passing queries to DB2. I have tried moving around things, using r"""query""" and str("") and str(r"""query""") and I always get the error. I also have tried running the queries without the python on the DB2 and it worked fine. But that was formatted slightly differently since I wasn't having to bypass issues with python syntax and escape character and the "#"

[IBM][System i Access ODBC Driver][DB2 for i5/OS]SQL0104 - Token .64 was not valid. Valid tokens: FOR USE SKIP WAIT WITH FETCH LIMIT ORDER UNION EXCEPT OFFSET. (-104) (SQLExecDirectW)')

This is my code.

import threading import time import pyodbc import mssql_python import requests import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] (%(threadName)s) %(message)s' ) # --- Connection --- DB2_CONNECTION_STRING = ( "DRIVER={iSeries Access ODBC Driver};" "DATABASE=DB2;" "SYSTEM=192.168.1.10;" "PORT=446;" "UID=USERNAME;" "PWD=PASSWORD;" "ConnectTimeout=10;" "LongDataCompat=1;" "TrimChar=1;" ) LOCAL_SQL_CONNECTION_STRING = ( "Server=SERVER\\TEST;" "Database=DB;" "Authentication=ActiveDirectoryIntegrated;" "Encrypt=yes;" "TrustServerCertificate=yes;" ) # ===================================================================== # CORE BASE WORKER CLASS (Direct Static String Execution Engine) # ===================================================================== class BaseWorker(threading.Thread): def __init__(self, target_table, query): super().__init__() self.name = f"Worker-{target_table}" self.target_table = target_table self.query = query.strip() self.daemon = True self.DB2_conn = None def _get_DB2_connection(self): """Maintains clean connection to the AS400 source.""" delay = 5 while self.DB2_conn is None: try: logging.info(f"Connecting to AS400 for {self.target_table}...") self.DB2_conn = pyodbc.connect(DB2_CONNECTION_STRING) self.DB2_conn.autocommit = True logging.info(f"Successfully connected to AS400 for {self.target_table}.") except pyodbc.Error as e: logging.error(f"AS400 Connection failed: {e}. Retrying in {delay}s...") time.sleep(delay) delay = min(delay * 2, 60) def _save_to_local_sql(self, data, DB2_description): """Wipes old target data and streams fresh rows cleanly into local SQL Express destination.""" if not data: return try: column_names = [col[0] for col in DB2_description] cols_str = ", ".join([f'"{c}"' for c in column_names]) placeholders = ",".join(["?" for _ in column_names]) # Formulate truncate and insert queries truncate_sql = f"TRUNCATE TABLE {self.target_table}" insert_sql = f"INSERT INTO {self.target_table} ({cols_str}) VALUES ({placeholders})" with mssql_python.connect(LOCAL_SQL_CONNECTION_STRING) as local_conn: with local_conn.cursor() as cursor: # 1. Clear out the previous interval's records cursor.execute(truncate_sql) # 2. Bulk insert the fresh batch of records cursor.executemany(insert_sql, data) local_conn.commit() logging.info(f"Successfully overwrote table with {len(data)} new rows in {self.target_table}.") except Exception as e: logging.error(f"mssql-python Target Save Error for {self.target_table}: {e}") def run_query_and_load(self): """Executes the query string using direct system execution.""" if self.DB2_conn is None: self._get_DB2_connection() if self.DB2_conn is None: return try: with self.DB2_conn.cursor() as cursor: cursor.execute(self.query) if self.query.upper().startswith("SELECT"): if cursor.description: results = cursor.fetchall() if results: cleaned_data = [] for row in results: cleaned_data.append(tuple(str(item).strip() if isinstance(item, str) else item for item in row)) self._save_to_local_sql(cleaned_data, cursor.description) else: logging.info(f"Command executed successfully for {self.target_table}.") except (pyodbc.InterfaceError, pyodbc.OperationalError) as e: logging.error(f"AS400 Connection dropped on {self.target_table}: {e}. Reconnecting...") self.DB2_conn = None except pyodbc.Error as e: logging.error(f"AS400 Driver execution error on {self.target_table}: {e}") except Exception as e: logging.error(f"Unexpected data processing bug on {self.target_table}: {e}") # ===================================================================== # THREAD EXECUTION IMPLEMENTATIONS # ===================================================================== class PLCTimedWorker(BaseWorker): """Loops database synchronization based strictly on a time interval.""" def __init__(self, interval, target_table, query): super().__init__(target_table, query) self.interval = interval def run(self): logging.info(f"Spawned Timed Worker for {self.target_table} ({self.interval}s)") self._get_DB2_connection() while True: start_time = time.time() self.run_query_and_load() elapsed = time.time() - start_time time.sleep(max(0.01, self.interval - elapsed)) # ===================================================================== # PRODUCTION ROUTING ARRAYS # ===================================================================== Inspection_itmnumber = "'192.168.1.30\Inspection_itmNumber'" Actitm_Spec_itm = "'192.168.1.30\Actitm_Spec_itm'" column = "REV#" timed_query1 = 'SELECT "FYCLR" FROM "DB2"."P025" WHERE "FYSTYL" = ' + Inspection_itmnumber timed_query2 = r"""SELECT "FWFSEQ","FWCLTF","FWCLTI","FWWSF3","FWWSI3" FROM "DB2"."P750S" WHERE "FWMACH" = 'LOC1' AND "FW" = """ + Actitm_Spec_itm + """ ORDER BY "FWFSEQ" ASC""" query3 = r"""SELECT "SBK" FROM "DB2"."P010" WHERE "ITM" = '192.168.1.30\itmUp_itmNumber'""" # Structure: (Interval, Target_Table_Name, SQL_Query) TIMED_TASKS = [ (1.0, "010", timed_query1), (0.5, "T010", timed_query2), (1.0, "I010", query3), ] if __name__ == "__main__": logging.info("--- Starting Production Ingestion Engine ---") active_threads = [] for interval, target_table, sql in TIMED_TASKS: t_worker = PLCTimedWorker(interval, target_table, sql) t_worker.start() active_threads.append(t_worker) time.sleep(0.04) try: while True: time.sleep(1) except KeyboardInterrupt: logging.info("Shutting down pipelines safely...")
Read Entire Article