Python-Django脚本实现PostgreSQL到MongoDB千万数据的迁移
需求
从PostgreSQL的一张表OrderRaw(电商订单的原始数据)查询出所有数据, 组织成财务需要的格式存入MongoDB, 这张表有四千万多万条数据
实现
需求很简单哈, 查出来所有的数据, for循环, 一条条处理, 然后存入MongoDB即可, 如下所示:
# MongoDBClient
mdb = MongoService.get_client()
# 获取所有OrderRaw的数据
order_raws = OrderRaw.objects.values('id', 'ptype', 'raw').all()
for order_raw in order_raws:
# 转换成财务数据
finance_data = finance_data(order_raw)
# 存入MongoDB
client.hermes.order_raw.insert(insert_data)
可能不用运行都会发现问题, 我们把问题列出来, 然后挨个去解决
-
取出所有数据会不会内存不够用?
答案是会, 参照这篇文章: Using Django querysets effectively, 而且有可能导致运行的程序宕掉
怎么办? Django queryset有iterator的功能, 迭代器, 分片去取, 而不是一次全部取出来
queryset有很多”学问”, 参照这篇文章可以学到很多
那么代码变成这样:
# MongoDBClient mdb = MongoService.get_client() # 获取所有OrderRaw的数据 order_raws = OrderRaw.objects.values('id', 'ptype', 'raw') for order_raw in order_raws.iterator(): # 转换成财务数据 finance_data = finance_data(order_raw) # 存入MongoDB client.hermes.order_raw.insert(insert_data)
这样我们运行看看, 我们会发现执行后”没有反应”, 大概能够猜到数据量太大, 可能会运行很长时间, 到底需要运行多长时间呢? 我们不知道
-
运行时间未知, 没有任何输出
我们可以采用进度条第三方包progressbar, 来输出时间和进度, 代码变成:
import progressbar # MongoDBClient mdb = MongoService.get_client() # 获取所有OrderRaw的数据 order_raws = OrderRaw.objects.values('id', 'ptype', 'raw') with progressbar.ProgressBar(max_value=order_raws.count()) as bar: count = 0 for order_raw in order_raws.iterator(): count += 1 # 转换成财务数据 finance_data = finance_data(order_raw) # 存入MongoDB client.hermes.order_raw.insert(insert_data) bar.update(count)
这样运行之后, 我们就能看到运行的进度和大概的时间了, 但是我们会发现大概会运行好几十个小时才能完成.
我们怎样优化呢?
-
运行时间太长, 如何优化
优化首先要做的是定位性能问题在哪里, Python有自己的性能分析工具cProfile, 我们可以查询出一定数量的数据来进行测试, 代码变成这样:
import cProfile import progressbar pr = cProfile.Profile() pr.enable() # MongoDBClient mdb = MongoService.get_client() # 获取所有OrderRaw的数据 order_raws = OrderRaw.objects.filter(id__gte=80000, id__lte=90000).values('id', 'ptype', 'raw') with progressbar.ProgressBar(max_value=order_raws.count()) as bar: count = 0 for order_raw in order_raws.iterator(): count += 1 # 转换成财务数据 finance_data = finance_data(order_raw) # 存入MongoDB client.hermes.order_raw.insert(insert_data) bar.update(count) pr.disable() # python3.6不兼容 # sortby = SortKey.TIME # ps = pstats.Stats(pr).sort_stats(sortby) ps = pstats.Stats(pr).sort_stats('time') ps.print_stats(10)
运行之后我们能看到输出, 显示往MongoDB插入数据的时间最久, 如何解决这个问题?
-
往MongoDB写入数据的时间太久
现在是一条一条数据插入, 我们能批量插入吗? 我们一千条存入一次试试:
import cProfile import progressbar pr = cProfile.Profile() pr.enable() # MongoDBClient mdb = MongoService.get_client() # 获取所有OrderRaw的数据 order_raws = OrderRaw.objects.filter(id__gte=80000, id__lte=90000).values('id', 'ptype', 'raw') with progressbar.ProgressBar(max_value=order_raws.count()) as bar: count = 0 insert_data = [] for order_raw in order_raws.iterator(): count += 1 # 转换成财务数据 finance_data = finance_data(order_raw) insert_data.append(finance_data) if count % 1000 == 0: # 存入MongoDB client.hermes.order_raw.insert_many(insert_data) insert_data.clear() bar.update(count) pr.disable() # python3.6不兼容 # sortby = SortKey.TIME # ps = pstats.Stats(pr).sort_stats(sortby) ps = pstats.Stats(pr).sort_stats('time') ps.print_stats(10)
我们还可以调试一次存入的数量, 最后发现2000条是正合适的.
最后我记得一个多小时执行完毕.
总结
细小的工作也有很多隐藏的知识, 上面知识列举了主要的东西, 其中还有很多细节, 我还有用多线程去做, 实现起来有一些复杂, 且销量也没有很大提升, 所以作罢
总结还是: 细节! 深入!