-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMSSQLODBCTest.py
93 lines (87 loc) · 2.74 KB
/
MSSQLODBCTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#pip install pyodbc
import pyodbc
import sys
strServer = "localhost"
strInitialDB = "Qualys_Portal"
strSQL = "select * from tblNetBlocks;"
def SQLConn (strServer,strInitialDB):
try:
# Open database connection
return pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+strServer+';DATABASE='+strInitialDB+';Trusted_Connection=yes;')
except pyodbc.InternalError as err:
print ("Error: unable to connect: {}".format(err))
sys.exit(5)
except pyodbc.OperationalError as err:
print ("Operational Error: unable to connect: {}".format(err))
sys.exit(5)
except pyodbc.ProgrammingError as err:
print ("Programing Error: unable to connect: {}".format(err))
sys.exit(5)
# except Exception as err:
# print ("Unknown Error: unable to connect: {}".format(err))
# sys.exit(5)
def SQLQuery (strSQL,db):
global strHeaders
try:
# prepare a cursor object using cursor() method
dbCursor = db.cursor()
# Execute the SQL command
dbCursor.execute(strSQL)
# Capture headers
for temp in dbCursor.description:
strHeaders += temp[0] + ","
if strHeaders[-1]==",":
strHeaders = strHeaders[:-1]
# Count rows
if strSQL[:6].lower() == "select":
dbResults = dbCursor.fetchall()
print ("dbResults type is: {}".format(type(dbResults)))
else:
db.commit()
dbResults = ()
iRowCount = dbCursor.rowcount
return [iRowCount,dbResults]
except pyodbc.InternalError as err:
if strSQL[:6].lower() != "select":
db.rollback()
return "Internal Error: unable to execute: {}".format(err)
except pyodbc.ProgrammingError as err:
if strSQL[:6].lower() != "select":
db.rollback()
return "Programing Error: unable to execute: {}".format(err)
except pyodbc.OperationalError as err:
if strSQL[:6].lower() != "select":
db.rollback()
return "Programing Error: unable to execute: {}".format(err)
except pyodbc.IntegrityError as err:
if strSQL[:6].lower() != "select":
db.rollback()
return "Integrity Error: unable to execute: {}".format(err)
# except Exception as err:
# return "Unknown Error: unable to execute: {}".format(err)
def ValidReturn(lsttest):
if isinstance(lsttest,list):
if len(lsttest) == 2:
if isinstance(lsttest[0],int) and isinstance(lsttest[1],list):
return True
else:
return False
else:
return False
else:
return False
strHeaders = ""
print ("Executing: {}".format(strSQL))
db = SQLConn (strServer,strInitialDB)
lstReturn = SQLQuery (strSQL,db)
if ValidReturn(lstReturn):
print ("Rows affected: {}".format(lstReturn[0]))
print ("Result set size: {}".format(len(lstReturn[1])))
print (strHeaders)
for row in lstReturn[1] :
# print ("Row is of type {}".format(type(row)))
print (" ".join(map(str,row)))
else:
print ("Unexpected: {} \n{}".format(lstReturn,strSQL))
# disconnect from server
db.close()