使用PyMySQL处理大结果集的方法

使用PyMySQL处理大结果集的方法


最近公司项目需要向别的数据库中获取数据并同步到自己的数据库,由于是别人的库,所以没法直接使用Master-Slave同步,最终选择Python脚本来获取。

选择了PyMySQL客户端库,按照其官方教程,代码比较简单,类似如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pymysql.cursors

# Connect to the database
connection = pymysql.connect(host='localhost',
user='user',
password='passwd',
db='db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)

try:
with connection.cursor() as cursor:
# Read a single record
sql = "SELECT `id`, `password` FROM `users`"
cursor.execute(sql)
result = cursor.fetchall()
print(result)
finally:
connection.close()

PyMySQL在获取数据时提供了fetchone()fetchall()函数来获取结果集,后来调试的时候,发现,不管是哪种方法,都会一次将所有结果获取到,这在数据量很大时将会消耗大量内存,所以有考虑是否还有别的方法,比如one-by-one的迭代获取。

在查看DictCursor游标代码时,发现了SSCursor游标类,其注释如下,意在解决数据量大的问题,正合我意。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SSCursor(Cursor):
"""
Unbuffered Cursor, mainly useful for queries that return a lot of data,
or for connections to remote servers over a slow network.

Instead of copying every row of data into a buffer, this will fetch
rows as needed. The upside of this, is the client uses much less memory,
and rows are returned much faster when traveling over a slow network,
or if the result set is very big.

There are limitations, though. The MySQL protocol doesn't support
returning the total number of rows, so the only way to tell how many rows
there are is to iterate over every row returned. Also, it currently isn't
possible to scroll backwards, as only the current row is held in memory.
"""

DictCursor游标类的方法返回都是一个迭代器,可以使用这个迭代器进行迭代获取,这样就不用一次将所有数据保存在内存中了。

1
2
3
4
5
6
7
def fetchall_unbuffered(self):
"""
Fetch all, implemented as a generator, which isn't to standard,
however, it doesn't make sense to return everything in a list, as that
would use ridiculous memory for large result sets.
"""
return iter(self.fetchone, None)

使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pymysql.cursors
src_pc_database = pymysql.connect(host='192.168.39.51', port=5151, user='*', password='*',
db='testdataanalyse',
charset='utf8mb4', cursorclass=pymysql.cursors.SSDictCursor)

with src_pc_database.cursor() as src_cursor:
sql = "select * from user"
src_cursor.execute(sql)
result = src_cursor.fetchone()

while result is not None:
result = src_cursor.fetchone()

src_pc_database.close()