本篇文章小编给大家分享一下Python通过跳板机访问数据库代码方法,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
什么是跳板机?
跳板机(Jump Server):也称堡垒机,是一类可作为跳板批量操作的远程设备的网络设备,是系统管理员和运维人员常用的操作平台之一。
那么具体是做什么的呢?
现在一些比较大的互联网企业,往往拥有大量的服务器,为了能够统一方便管理,运维人员通过跳板机去管理对应的服务器。我们在访问服务器的时候,先通过登陆跳板机,然后再进入相应服务器。从一定程度上提升了服务器的数据安全性,也提升了服务器的可维护性。
sshtunnel 连接堡垒机
跳板机的本质还是ssh连接,通过paramiko 自己造轮子的话理论也是可以实现的,这边推荐 sshtunnel 这个包,省的自己写了。
理解了跳板机的原理,这边的配置就比较好理解了。拿这张图举例,首先需要先访问跳板机,其实就是最常见的ssh连接,可以是账户密码,也可以是密钥登录的,如果是密钥的话需要在 跳板机的.ssh 目录下的某个许可文件中添加上自己的公钥
最后还需要补充一下 目标主机的ip 和端口。理解了这些,以下的这些参数就知道怎么配置了,
local_bind_address 可以不配置,不配置会随机获取本地的端口号,如果指定的话就会占用本地固定的端口号,推荐配置这一参数。
填写本地的端口是因为这边会将目标服务器ip的端口映射为本地端口,然后访问本地端口就能实现目标端口的访问了。
from sshtunnel import SSHTunnelForwarder server = SSHTunnelForwarder( (host_jump, int(port_jump)), # 跳板机的配置 ssh_pkey=ssh_pk_jump, #密钥的访问方式,如果是密码 ssh_password=??? ssh_username=user_name_jump, remote_bind_address=(host_mysql, int(port_mysql))) # 数据库服务器的配置 local_bind_address=(local_ip, local_port) # 开启本地转发端口 ) server.start() server.close()
有些人习惯用 上下文管理器,看起来更舒服些
from sshtunnel import SSHTunnelForwarder with SSHTunnelForwarder( (host_jump, int(port_jump)), # 跳板机的配置 ssh_pkey=ssh_pk_jump, #密钥的访问方式,如果是密码 ssh_password=??? ssh_username=user_name_jump, remote_bind_address=(host_mysql, int(port_mysql))) # 数据库服务器的配置 local_bind_address=(local_ip, local_port) # 开启本地转发端口 )as server server.start()
连接数据库
建立好通道后就能通过本地映射的端口访问目标的数据库端口了。
常见的mysql 数据库访问 比如 mysqldb,pymysql,mysql.connector,
mysqldb跟 pymysql 差不多,只不过前者适用python2的版本,后者适用python3的版本,mysql.connector是官方推荐的。
pymysql 跟 mysql.connector 还是有一些区别的
我这边使用的是 mysql.connector ,在实践中老是无法连接上,而pymysql却是可以的。
最终终于找到了解决办法。。。
以下是我的mysql.connector 连接数据库的样例。
with SSHTunnelForwarder( (host_jump, int(port_jump)), # 跳板机的配置 ssh_pkey=ssh_pk_jump, #密钥的访问方式,如果是密码 ssh_password=??? ssh_username=user_name_jump, remote_bind_address=(host_mysql, int(port_mysql))) # 数据库服务器的配置 local_bind_address=(local_ip, local_port) # 开启本地转发端口 )as server server.start() conn = mysql.connector.connect(**{'host': '127.0.0.1', 'port': server.local_bind_port, 'user': dbcfg["user"], 'password': dbcfg["password"], 'db': dbcfg["db"], 'charset': dbcfg["charset"], 'collation': dbcfg["collation"], 'use_unicode': dbcfg["use_unicode"], 'use_pure': True}) cursor = conn.cursor(dictionary=True) cursor.execute(sql, params) rows = cursor.fetchall() # conn.commit() cursor.close() conn.close()
这样,数据库就能正常访问啦
代码重构和优化
首先,就上面这样的代码而言,在每次查询都需要进行跳板机的连接,对于短时间突然大量请求的情况,这种是效率极低的。所以最好是能够将建立管道保存下来。
其次,如果同时访问多种数据库的情况,存在有无跳板机的场景,所以最好能够先将所有的数据库都进行连接,将conn保存在字典中,根据要访问的机器调用对应的conn,最后再统一释放。
def get_slave_conn_byProxy(dbcfg): server = SSHTunnelForwarder( eval(dbcfg["addr"]), ssh_username=dbcfg["ssh_username"], ssh_pkey=os.getenv("HOME") + "/.ssh/id_rsa", remote_bind_address=eval(dbcfg["remote_bind_address"]), local_bind_address=eval(dbcfg["local_bind_address"]) # 开启本地转发端口 ) server.start() conn = get_db_connection({'host': '127.0.0.1', 'port': server.local_bind_port, 'user': dbcfg["user"], 'password': dbcfg["password"], 'db': dbcfg["db"], 'charset': dbcfg["charset"], 'collation': dbcfg["collation"], 'use_unicode': dbcfg["use_unicode"], 'use_pure': True}) return server, conn def get_db_connection(dbcfg): # logger.info(f"============ debug db config: {dbcfg}") conn = mysql.connector.connect(**dbcfg) return conn # 通过输入的连接参数判断是否需要通过跳板机的方式 def get_slave_conn(dbcfg): if "addr" in dbcfg: return get_slave_conn_byProxy(dbcfg) else: conn = get_db_connection(dbcfg) return None, conn def read_slave_db_data(sql, conn, params=None, use_pdf=True): cursor = conn.cursor(dictionary=True) cursor.execute(sql, params) rows = cursor.fetchall() # conn.commit() cursor.close() if use_pdf: return pd.DataFrame(rows) else: return rows