python中怎么利用cx_Oracle连接oracle数据库
本篇文章为大家展示了python中怎么利用cx_Oracle连接oracle数据库,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
“专业、务实、高效、创新、把客户的事当成自己的事”是我们每一个人一直以来坚持追求的企业文化。 成都创新互联是您可以信赖的网站建设服务商、专业的互联网服务提供商! 专注于网站设计制作、做网站、软件开发、设计服务业务。我们始终坚持以客户需求为导向,结合用户体验与视觉传达,提供有针对性的项目解决方案,提供专业性的建议,创新互联建站将不断地超越自我,追逐市场,引领市场!
直接使用:
CheckOracle.py -H [remote_home] -u [oracle_user] -p [oracle_password] -s [oracle_SID] -i [information] [table_name]
具体使用细节和例子,使用-h获取帮助
其他python调用:
1 可以在实例化对象时指定参数
2 调用的函数为OraInfo()
3 返回结果集
#!/usr/bin/env python3
#_*_coding:utf-8_*_
#Auth by raysuen
#v1
import cx_Oracle
import sys,re,datetime
class OracleConn(object): #封装类
#存放连接信息的
Info = {
"host":"127.0.0.1", #远端oracle的host
"port":1521, #远端oracle的port
"OraSID":None, #远端oracle的SID
"OraUser":None, #远端oracle的username
"OraPasswd":None, #远端oracle的username对应的密码
# "OraSchemas":None,
"OraInfo":None, #想要获取远端数据库的那种信息
"OraTable":None #有关系的oracle的表名
}
def __init__(self,host=None,port=None,sid=None,orauser=None,orapwd=None,orainfo=None,oratable=None):
if host != None:
self.Info["host"] = host
if sid != None:
self.Info["OraSID"] = sid
if port != None:
self.Info["port"] = port
if orauser != None:
self.Info["OraUser"] = orauser
if orapwd != None:
self.Info["OraPasswd"] = orapwd
if orainfo != None:
self.Info["OraInfo"] = orainfo
if oratable != None:
self.Info["OraTable"] = oratable
def Check_Info(self): #判断Info字典里面的key是否存在必要的值
if self.Info["OraUser"] == None:
print("You must specify a oracle username for connecting oracle.")
print("If you don't know how to specify the parameters.You can -h to get help")
exit(3)
if self.Info["OraPasswd"] == None:
print("You must specify a oracle password for connecting oracle.")
print("If you don't know how to specify the parameters.You can -h to get help")
exit(3)
if self.Info["OraSID"] == None:
print("You must specify a oracle SID for connecting oracle.")
print("If you don't know how to specify the parameters.You can -h to get help")
exit(3)
if self.Info["OraInfo"] == None:
print("You must specify a Information about oracle")
print("If you don't know how to specify the parameters.You can -h to get help")
exit(3)
def ConnectOracle(self): #封装连接数据库的连接,并返回连接对象
try:
tnsname = cx_Oracle.makedsn(self.Info["host"], self.Info["port"], self.Info["OraSID"])
ora = cx_Oracle.connect(self.Info["OraUser"], self.Info["OraPasswd"], tnsname)
except Exception as e:
print(e)
exit(4)
return ora
def CloseOracle(self,oraCon): #封装管理数据库连接的函数
oraCon.close()
def ExecSql(self,SqlStr): #封装数据执行sql的函数
try:
ora = self.ConnectOracle()
cursor = ora.cursor()
cursor.execute(SqlStr)
res = cursor.fetchall()
cursor.close
self.CloseOracle(ora)
except Exception as e:
print(e)
exit(5)
return res
def GetTableSpace(self): #获取tablespace信息的函数
sqlStr="""
select a.tablespace_name,
round(a.bytes_alloc / 1024 / 1024, 2) megs_alloc,
round(nvl(b.bytes_free, 0) / 1024 / 1024, 2) megs_free,
round((a.bytes_alloc - nvl(b.bytes_free, 0)) / 1024 / 1024, 2) megs_used,
round((nvl(b.bytes_free, 0) / a.bytes_alloc) * 100,2)||'%' Pct_Free,
100 - round((nvl(b.bytes_free, 0) / a.bytes_alloc) * 100,2)||'%' Pct_used,
round(maxbytes/1048576,2) Max,
round(round((a.bytes_alloc - nvl(b.bytes_free, 0)) / 1024 / 1024, 2) / round((case maxbytes when 0 then a.bytes_alloc else maxbytes end)/1048576,2) * 100,2) || '%' "USED_MAX%"
from ( select f.tablespace_name,
sum(f.bytes) bytes_alloc,
sum(decode(f.autoextensible, 'YES',f.maxbytes,'NO', f.bytes)) maxbytes
from dba_data_files f
group by tablespace_name) a,
( select f.tablespace_name,
sum(f.bytes) bytes_free
from dba_free_space f
group by tablespace_name) b
where a.tablespace_name = b.tablespace_name (+)
union all
select h.tablespace_name,
round(sum(h.bytes_free + h.bytes_used) / 1048576, 2) megs_alloc,
round(sum((h.bytes_free + h.bytes_used) - nvl(p.bytes_used, 0)) / 1048576, 2) megs_free,
round(sum(nvl(p.bytes_used, 0))/ 1048576, 2) megs_used,
round((sum((h.bytes_free + h.bytes_used) - nvl(p.bytes_used, 0)) / sum(h.bytes_used + h.bytes_free)) * 100,2)||'%' Pct_Free,
100 - round((sum((h.bytes_free + h.bytes_used) - nvl(p.bytes_used, 0)) / sum(h.bytes_used + h.bytes_free)) * 100,2)||'%' pct_used,
round(sum(f.maxbytes) / 1048576, 2) max,
round(round(sum(nvl(p.bytes_used, 0))/ 1048576, 2)/round(sum(case f.maxbytes when 0 then (h.bytes_free + h.bytes_used) else f.maxbytes end) / 1048576, 2) * 100,2)||'%' "USED_MAX%"
from sys.v_$TEMP_SPACE_HEADER h, sys.v_$Temp_extent_pool p, dba_temp_files f
where p.file_id(+) = h.file_id
and p.tablespace_name(+) = h.tablespace_name
and f.file_id = h.file_id
and f.tablespace_name = h.tablespace_name
group by h.tablespace_name
ORDER BY 1
"""
res = self.ExecSql(sqlStr)
return res
def PrintTablespace(self,res):
headStr = """|%s|%s|%s|%s|%s|%s|%s|%s|"""%("tablespace_name".center(30),"megs_alloc".center(15),"megs_free".center(15),"megs_used".center(15),"Pct_Free".center(15),"Pct_used".center(15),"Max_MB".center(15),"USED_MAX_PCT".center(15))
print("%s"%"".center(144,"-"))
print(headStr)
print("%s" % "".center(144, "-"))
for t in res:
print("|%s|%s|%s|%s|%s|%s|%s|%s|"%(t[0].center(30),str(t[1]).center(15),str(t[2]).center(15),str(t[3]).center(15),t[4].center(15),t[5].center(15),str(t[6]).center(15),t[7].center(15)))
print("%s" % "".center(144, "-"))
def GetAsmDiskGroup(self):
sqlStr="""select name,total_mb,free_mb from v$asm_diskgroup"""
res = self.ExecSql(sqlStr)
return res
def PrintAsmDiskGroup(self,res):
print("%s" % "".center(50, "-"))
print("|%s|%s|%s|"%("GROUP_NAME".center(20),"TOTAL_MB".center(15),"FREE_MB".center(15)))
print("%s" % "".center(50, "-"))
for t in res:
print("|%s|%s|%s|"%(t[0].center(20),str(t[1]).center(15),str(t[2]).center(15)))
print("%s" % "".center(50, "-"))
def GetRedo(self):
sqlStr="""
select a.group#,a.BYTES/1024/1024 mb,b.MEMBER,a.thread#,a.sequence#,a.members,a.archived,a.status,a.first_time,a.next_time from gv$log a,gv$logfile b where a.GROUP#=b.GROUP# group by a.group#,a.thread#,a.BYTES/1024/1024,b.MEMBER,a.sequence#,a.members,a.archived,a.status,a.first_time,a.next_time order by group#
"""
res = self.ExecSql(sqlStr)
return res
def PrintRedo(self,res):
print("%s" % "".center(148, "-"))
print("|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|"%("GROUP".center(5),"SIZE_MB".center(7),"MEMBER".center(50),"THREAD".center(6),"SEQUENCE".center(8),"MEMBERS".center(7),"ARCHIVED".center(8),"STATUS".center(8),"FIRST_TIME".center(19),"NEXT_TIME".center(19)))
print("%s" % "".center(148, "-"))
for t in res:
if t[9] == None:
tStr = "|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|"%(str(t[0]).center(5),str(t[1]).center(7),t[2].center(50),str(t[3]).center(6),str(t[4]).center(8),str(t[5]).center(7),t[6].center(8),t[7].center(8),datetime.datetime.strftime(t[8],"%Y-%m-%d %H:%M:%S"),"None".center(19))
else:
tStr = "|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|" % (
str(t[0]).center(5), str(t[1]).center(7), t[2].center(50), str(t[3]).center(6), str(t[4]).center(8),
str(t[5]).center(7), t[6].center(8), t[7].center(8),
datetime.datetime.strftime(t[8], "%Y-%m-%d %H:%M:%S"),
datetime.datetime.strftime(t[9], "%Y-%m-%d %H:%M:%S"))
print(tStr)
print("%s" % "".center(148, "-"))
def GetRedoShift(self):
sqlStr = """
SELECT
to_char(first_time,'YYYY-MM-DD') day,
to_char(sum(decode(to_char(first_time,'HH24'),'00',1,0)),'999') "00",
to_char(sum(decode(to_char(first_time,'HH24'),'01',1,0)),'999') "01",
to_char(sum(decode(to_char(first_time,'HH24'),'02',1,0)),'999') "02",
to_char(sum(decode(to_char(first_time,'HH24'),'03',1,0)),'999') "03",
to_char(sum(decode(to_char(first_time,'HH24'),'04',1,0)),'999') "04",
to_char(sum(decode(to_char(first_time,'HH24'),'05',1,0)),'999') "05",
to_char(sum(decode(to_char(first_time,'HH24'),'06',1,0)),'999') "06",
to_char(sum(decode(to_char(first_time,'HH24'),'07',1,0)),'999') "07",
to_char(sum(decode(to_char(first_time,'HH24'),'08',1,0)),'999') "08",
to_char(sum(decode(to_char(first_time,'HH24'),'09',1,0)),'999') "09",
to_char(sum(decode(to_char(first_time,'HH24'),'10',1,0)),'999') "10",
to_char(sum(decode(to_char(first_time,'HH24'),'11',1,0)),'999') "11",
to_char(sum(decode(to_char(first_time,'HH24'),'12',1,0)),'999') "12",
to_char(sum(decode(to_char(first_time,'HH24'),'13',1,0)),'999') "13",
to_char(sum(decode(to_char(first_time,'HH24'),'14',1,0)),'999') "14",
to_char(sum(decode(to_char(first_time,'HH24'),'15',1,0)),'999') "15",
to_char(sum(decode(to_char(first_time,'HH24'),'16',1,0)),'999') "16",
to_char(sum(decode(to_char(first_time,'HH24'),'17',1,0)),'999') "17",
to_char(sum(decode(to_char(first_time,'HH24'),'18',1,0)),'999') "18",
to_char(sum(decode(to_char(first_time,'HH24'),'19',1,0)),'999') "19",
to_char(sum(decode(to_char(first_time,'HH24'),'20',1,0)),'999') "20",
to_char(sum(decode(to_char(first_time,'HH24'),'21',1,0)),'999') "21",
to_char(sum(decode(to_char(first_time,'HH24'),'22',1,0)),'999') "22",
to_char(sum(decode(to_char(first_time,'HH24'),'23',1,0)),'999') "23"
from
v$log_history
GROUP by
to_char(first_time,'YYYY-MM-DD') order by day desc
"""
res = self.ExecSql(sqlStr)
return res
def PrintRedoShift(self,res):
print("%s" % "".center(132, "-"))
print("|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|"%("DAY".center(10),"00".center(4),"01".center(4),"02".center(4),"03".center(4),"04".center(4),"05".center(4),"06".center(4),"07".center(4),"08".center(4),"09".center(4),"10".center(4),"11".center(4),"12".center(4),"13".center(4),"14".center(4),"15".center(4),"16".center(4),"17".center(4),"18".center(4),"19".center(4),"20".center(4),"21".center(4),"22".center(4),"23".center(4)))
print("%s" % "".center(132, "-"))
for t in res:
print("|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|"%(t[0].center(10),t[1].center(4),t[2].center(4),t[3].center(4),t[4].center(4),t[5].center(4),t[6].center(4),t[7].center(4),t[8].center(4),t[9].center(4),t[10].center(4),t[11].center(4),t[12].center(4),t[12].center(4),t[13].center(4),t[14].center(4),t[15].center(4),t[16].center(4),t[17].center(4),t[18].center(4),t[19].center(4),t[20].center(4),t[21].center(4),t[22].center(4),t[23].center(4)))
print("%s" % "".center(132, "-"))
def GetExecNow(self):
sqlStr = """
select distinct b.SID,b.SERIAL#,p.SPID,b.LAST_CALL_ET,a.sql_id, a.sql_text,b.status,b.event,b.MODULE, b.OSUSER,b.MACHINE from v$sql a,v$session b,v$process p where a.SQL_ID=b.SQL_ID and b.PADDR=p.ADDR and b.STATUS='ACTIVE' order by B.LAST_CALL_ET desc
"""
res = self.ExecSql(sqlStr)
return res
def PrintExecNow(self,res):
print("%s" % "".center(188, "-"))
print("|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|"%("SID".center(6),"SERIAL".center(6),"SPID".center(5),"LAST_ET".center(7),"SQL_ID".center(15),"SQL_TEXT".center(60),"STATUS".center(7),"EVENT".center(30),"MODULE".center(15),"OSUSER".center(10),"MACHINE".center(15)))
print("%s" % "".center(188, "-"))
for t in res:
print("|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|"%(str(t[0]).center(6),str(t[1]).center(6),t[2].center(5),str(t[3]).center(7),t[4].center(15),t[5].strip()[0:60].center(60),t[6].center(7),t[7][0:30].center(30),t[8][0:15].center(15),t[9].center(10),t[10][0:15].center(15)))
print("%s" % "".center(188, "-"))
def GetIndexInfo(self):
if self.Info["OraTable"] == None:
print("You must specify a oracle table to get indexs information of table.")
print("If you don't know how to specify the parameters.You can -h to get help")
exit(6)
sqlStr = """
select
table_name,
TABLE_TYPE,
INDEX_NAME,
INDEX_TYPE,
TABLE_OWNER,
max(columns) columns
from
(SELECT
ui.table_name,
ui.TABLE_TYPE,
ui.INDEX_NAME,
ui.INDEX_TYPE,
uic.TABLE_OWNER,
to_char(wm_concat (uic.COLUMN_NAME)
over(partition by ui.table_name,ui.TABLE_TYPE,ui.INDEX_NAME,ui.INDEX_TYPE,uic.TABLE_OWNER order by uic.COLUMN_POSITION)) columns
FROM
dba_indexes ui,
dba_IND_COLUMNS uic
WHERE
ui.INDEX_NAME (+) = uic.INDEX_NAME
AND ui.TABLE_NAME = UPPER ('%s'))
GROUP BY
table_name,
TABLE_TYPE,
INDEX_NAME,
INDEX_TYPE,
TABLE_OWNER
"""%self.Info["OraTable"]
res = self.ExecSql(sqlStr)
return res
def PrintIndexInfo(self,res):
print("%s" % "".center(142, "-"))
print("|%s|%s|%s|%s|%s|%s|"%("TABLE_NAME".center(20),"TABLE_TYPE".center(20),"INDEX_NAME".center(25),"INDEX_TYPE".center(15),"TABLE_OWNER".center(15),"COLUMNS".center(40)))
print("%s" % "".center(142, "-"))
for t in res:
print("|%s|%s|%s|%s|%s|%s|" % (t[0].center(20), t[1].center(20), t[2].center(25), t[3].center(15), t[4].center(15), t[5].center(40)))
print("%s" % "".center(142, "-"))
def GetPartInfo(self):
if self.Info["OraTable"] == None:
print("You must specify a oracle table to get indexs information of table.")
print("If you don't know how to specify the parameters.You can -h to get help")
exit(6)
sqlStr = """
SELECT
a.TABLE_OWNER,
a.table_name,
c. M,
a.PARTITION_NAME,
a.HIGH_VALUE,
a.NUM_ROWS,
a.TABLESPACE_NAME,
b.COLUMN_NAME,
A .LAST_ANALYZED
FROM
dba_TAB_PARTITIONS A,
DBA_PART_KEY_COLUMNS b,
(
SELECT
SUM (bytes / 1024 / 1024) M,
segment_name,
partition_name
FROM
dba_segments
WHERE segment_type LIKE '%%TABLE%%'
AND partition_name IS NOT NULL
and segment_name = upper('%s')
GROUP BY
segment_name,
partition_name
ORDER BY
segment_name,
partition_name DESC
) c
WHERE
A .TABLE_NAME = b. NAME (+)
AND A .table_name = c.SEGMENT_NAME
AND A .partition_name = c.PARTITION_NAME
AND A .table_name = upper('%s')
ORDER BY
A .TABLE_NAME,
partition_name DESC
"""%(self.Info["OraTable"],self.Info["OraTable"])
res = self.ExecSql(sqlStr)
return res
def PrintPartInfo(self,res):
print("%s" % "".center(184, "-"))
print("|%s|" % "TABLE_OWNER".center(15), end='')
print("%s|" % "TABLE_NAME".center(25), end='')
print("%s|" % "MB".center(5), end='')
print("%s|" % "PARTITION_NAME".center(20), end='')
print("%s|" % "HIGH_VALUE".center(40), end='')
print("%s|" % "NUM_ROWS".center(10), end='')
print("%s|" % "TABLESPACE".center(20), end='')
print("%s|" % "CULUMN_NAME".center(20), end='')
print("%s|" % "LAST_ANALYZED".center(19))
print("%s" % "".center(184, "-"))
for t in res:
print("|%s|"%t[0].center(15),end='')
print("%s|"%t[1].center(25), end='')
print("%s|"%str(t[2]).center(5), end='')
print("%s|"%t[3].center(20), end='')
print("%s|"%t[4][0:40].center(40), end='')
print("%s|"%str(t[5]).center(10), end='')
print("%s|"%t[6].center(20), end='')
if t[7] == None:
print("%s|"%"None".center(20))
else:
print("%s|"%t[7].center(20), end='')
print("%s|"%(datetime.datetime.strftime(t[8],"%Y-%m-%d %H:%M:%S")))
print("%s" % "".center(184, "-"))
def OraInfo(self): #这个函数不格式化打印返回结果,只返回结果集
self.Check_Info()
if self.Info["OraInfo"].upper() == "TABLESPACE":
self.GetTableSpace()
elif self.Info["OraInfo"].upper() == "ASMDISKGROUP":
self.GetAsmDiskGroup()
elif self.Info["OraInfo"].upper() == "REDO":
self.GetRedo()
elif self.Info["OraInfo"].upper() == "REDOSHIFT":
self.GetRedoShift()
elif self.Info["OraInfo"].upper() == "EXECNOW":
self.GetExecNow()
elif self.Info["OraInfo"].upper() == "INDEXINFO":
self.GetIndexInfo()
elif self.Info["OraInfo"].upper() == "PARTITIONINFO":
self.GetPartInfo()
else:
print("Please enter valid value for -i")
exit(6)
def PrintOraInfo(self):
self.Check_Info()
if self.Info["OraInfo"].upper() == "TABLESPACE":
self.PrintTablespace(self.GetTableSpace())
elif self.Info["OraInfo"].upper() == "ASMDISKGROUP":
self.PrintAsmDiskGroup(self.GetAsmDiskGroup())
elif self.Info["OraInfo"].upper() == "REDO":
self.PrintRedo(self.GetRedo())
elif self.Info["OraInfo"].upper() == "REDOSHIFT":
self.PrintRedoShift(self.GetRedoShift())
elif self.Info["OraInfo"].upper() == "EXECNOW":
self.PrintExecNow(self.GetExecNow())
elif self.Info["OraInfo"].upper() == "INDEXINFO":
self.PrintIndexInfo(self.GetIndexInfo())
elif self.Info["OraInfo"].upper() == "PARTITIONINFO":
self.PrintPartInfo(self.GetPartInfo())
else:
print("Please enter valid value for -i")
exit(6)
def fun_help():
helpStr = """
Name
CheckOracle.py display information which you want to know
Synopsis
CheckOracle.py -H [romote_host] -u [oracle user] -p [oracle password] -s [oracle SID] -i [information] [table_name]
Description
-H Specify a remote host,defaul 127.0.0.1.
-u Specify a oracle user
-p Specify a oracle password
-s Specify a oracle SID
-i Specify a action information what you want to know
Value:
tablespace
asmdiskgroup
redo
redoshift
execnow
indexinfo
Table_name must be specified after indexinfo.
partitioninfo
Table_name must be specified after partitioninfo.
Example
python3 CheckOracle.py -H 127.0.0.1 -u system -p BIIpass01 -s masdb3 -i execnow
python3 CheckOracle.py -H 127.0.0.1 -u system -p BIIpass01 -s masdb3 -i indexinfo dept
python3 CheckOracle.py -H 127.0.0.1 -u system -p BIIpass01 -s masdb3 -i partitioninfo dept
"""
print(helpStr)
if __name__=="__main__":
# print(sys.argv)
# print(len(sys.argv))
if len(sys.argv) > 1: #判断是否传递了参数
for i in range(1,len(sys.argv)): #循环参数下标
# print(sys.argv[i])
if sys.argv[i] == "-H": #判断参数
if re.match("^-",sys.argv[i+1]) == None: #判断当先的下一个参数是否为-开头,-开头不是正取的value值
i += 1 #下标加1
OracleConn.Info["host"] = sys.argv[i] #获取参数值
else:
print("-H.You use this parameter,but no value are specified") #当前参数的下一个参数,不是正取的value值,报错退出
exit(2)
elif sys.argv[i] == "-u":
if re.match("^-", sys.argv[i+1]) == None:
i += 1
OracleConn.Info["OraUser"] = sys.argv[i]
else:
print("-u.You use this parameter,but no value are specified")
exit(2)
elif sys.argv[i] == "-p":
if re.match("^-", sys.argv[i+1]) == None:
i += 1
OracleConn.Info["OraPasswd"] = sys.argv[i]
else:
-
网站标题:python中怎么利用cx_Oracle连接oracle数据库
新闻来源:http://pcwzsj.com/article/gceoco.html