python使用elasticsearch_dsl库聚合查询Es并进行分页
做大数据分析时用到Es,需要查询聚合后的每类数据量,聚合后的桶超过10000.搜索了半天,总结下。
1、首先导入所需的库
from elasticsearch_dsl import connections,Search,Q,A
Q用作条件查询、A用作聚合
2、建立Es连接客户端
client = connections.create_connection(hosts=settings.ES_HOST, timeout=settings.ES_TIMEOUT)
3、新建了一个方法,专门处理Es查询,后续分页也会用到
def fetch_case_num(class,after_key=None):
s = Search(using=client, index='t_case_party')
q = Q('range', time={'gte': '2024-09-03'})
q = q & Q('match', class=class)
# 排除type_id为空的字段
q = q & Q('exists', field='type_id')
res = s.query(q)
# 首次查询
if after_key is None:
composite_agg = A('composite', sources=[
{'term': A('terms', field='type_id.keyword')}
], size=1000)
else:
composite_agg = A('composite', sources=[
{'term': A('terms', field='type_id.keyword')}
], size=1000,after=after_key)
res.aggs.bucket('gender_terms', composite_agg)
# 执行查询并返回响应
return res.execute()
4、调用上面的方法进行查询并获取桶
# 查询首页
response = fetch_case_num(startdate,orguuid)
# 获取查询数据桶
buckets = response.aggregations.gender_terms.buckets
5、循环检查是否还有下一页,有的话执行查询方法追加到桶中
# 循环获取所有页
while after_key:
print(after_key)
response = fetch_case_num(startdate,orguuid,after_key)
if response:
buckets.extend(response.aggregations.gender_terms.buckets)
after_key = response.aggregations.gender_terms.after_key if 'after_key' in response.aggregations.gender_terms else None
else:
break
6、处理数据
results = []
for bucket in buckets:
result = {}
result['cus_id'] = bucket.key['term']
result['val_c'] = bucket.doc_count
results.append(result)
ok,好了。主要是用到after_key,查看是否存在,接着after_key继续查询,如果没数据就说明到最后了。