运维的时候有时候需要连接多个mysql, 一般是选用多个窗口来做, 当然也有图形化的客户端软件.
本文使用一个简单的方法: 在mysql里面连接Mysql. 听起来是不是有点怪
1. 伪造一个server
2. 客户端连接到这个server上, 然后转发客户端的流量到真实的server
3. 当接收到特殊语句 比如: /*ddcw_switch*/select root:123456@192.168.101.19:3306;
时, 就连接到指定的新的server, 然后返回OK给客户端.
4. 客户端执行的新的查询就会被 中间件 发往新的server
基本上都是根据之前的脚本修修改改....
指定监听的端口, 和真实是mysql服务器(默认连接的服务), 不需要账号密码, 因为是客户端提供, 这里只负责转发
--skip-ssl 还没实现ssl
-c 传递注释到server
mysql -h192.168.101.21 -P3306 -p123456 --skip-ssl -c
发现数据确实是新服务器的了, 说明这个功能是正常的.
1. 发现能解析mysql连接协议之后, 就能做很多事情了, 比如上次的读写分离, 这次的mysql里面连接mysql, 还可以做流量镜像, 审计等
2. 我是专门使用的一个线程去处理client发来的数据, 再来个线程去处理发给mysql的数据的. 通信使用的是Queue
3. 运维的时候可能有用吧, 毕竟在一个窗口就能连接多个数据库.
待改进: 可以查询多个数据库的结果汇总在一起, 运维就更方便了(就像分布式数据库那样)
testpymysql.py见上一章. 需要修改下client_flag 加个CLIENT_DEPRECATE_EOF, 因为客户端是使用的CLIENT_DEPRECATE_EOF, 我只是懒得去判断了.
mysql_switch.py如下
import struct
from threading import Thread
from multiprocessing import Process, Queue
import socket
import time
import testpymysql
import signal
import os,sys
def btoint(bdata,t='little'):
return int.from_bytes(bdata,t)
def read_pack(rf):
pack_header = rf.read(4)
if len(pack_header) < 4:
print(pack_header,' bye!')
exit(2)
btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
pack_size = btrl + (btrh << 16)
bdata = rf.read(pack_size)
if bdata.find(b'/*ddcw_switch*/') != -1:
return pack_header+bdata,True
else:
return pack_header+bdata,False
class mrw(object):
def __init__(self):
self.host = '0.0.0.0'
self.port = 3306
self.w = ('192.168.101.21',3308,)
#专门发信息给客户端...
def sendtoclient(self,client_sock,q):
while True:
bdata = q.get()
print('send: ',btoint(bdata[3:4]),bdata)
client_sock.sendall(bdata)
print('send:fin',btoint(bdata[3:4]),bdata)
def handler_server(self,rf,q):
def sc(*args,**kwargs):
print("i'm died....")
sys.exit(0)
#signal.signal(signal.SIGTERM, sc)
while True:
bdata,status = read_pack(rf)
#print('send: ', btoint(bdata[3:4]), bdata)
q.put(bdata)
def handler(self,conn,addr):
#连接SERVER
sock = socket.create_connection((self.w[0], self.w[1]))
sock.settimeout(None)
srf = sock.makefile('rb')
crf = conn.makefile('rb')
queue = Queue()
t1 = Process(target=self.sendtoclient,args=(conn,queue))
t1.start()
t2 = Process(target=self.handler_server,args=(srf,queue,))
t2.start()
#pid = t2.pid
while True:
bdata,status = read_pack(crf)
print('recived:', btoint(bdata[3:4]), bdata)
if status:
data = bdata[6:].decode().replace('"','').replace("'","").split(';')[0].split()
dl = len(data)
data = data[dl-1]
user_password = data.split('@')[0]
host_port = data.split('@')[1]
nm = testpymysql.mysql()
nm.host = host_port.split(':')[0]
nm.port = host_port.split(':')[1]
nm.user = user_password.split(':')[0]
nm.password = user_password.split(':')[1]
nm.connect()
queue.put(b'\x01\x00\x00\x01\x01')
queue.put(b'\x1c\x00\x00\x02\x03def\x00\x00\x00\x06status\x00\x0c!\x00\x15\x00\x00\x00\xfd\x01\x00\x1f\x00\x00')
queue.put(b'\x08\x00\x00\x03\x07Success')
queue.put(b'\x07\x00\x00\x04\xfe\x00\x00\x02\x00\x00\x00')
sock = nm.sock
t2.terminate()
t2 = Thread(target=self.handler_server,args=(nm.rf,queue))
t2.start()
else:
sock.sendall(bdata)
print('send to server')
def init(self):
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server.bind((self.host, self.port))
socket_server.listen(151)
self.socket_server = socket_server
accept_client_thread = Thread(target=self.accept_client,)
accept_client_thread.start()
accept_client_thread.join()
def accept_client(self,):
while True:
conn, addr = self.socket_server.accept()
thread = Process(target=self.handler,args=(conn,addr),)
thread.start()
aa = mrw()
aa.init()