python使用elasticsearch_dsl库聚合查询Es并进行分页

10个月前 (09-04)学习729

做大数据分析时用到Es,需要查询聚合后的每类数据量,聚合后的桶超过10000.搜索了半天,总结下。

1、首先导入所需的库

from elasticsearch_dsl import connections,Search,Q,A

Q用作条件查询、A用作聚合

2、建立Es连接客户端

client = connections.create_connection(hosts=settings.ES_HOST, timeout=settings.ES_TIMEOUT)

3、新建了一个方法,专门处理Es查询,后续分页也会用到

def fetch_case_num(class,after_key=None):
    s = Search(using=client, index='t_case_party')

    q = Q('range', time={'gte': '2024-09-03'})
    q = q & Q('match', class=class)
	# 排除type_id为空的字段
    q = q & Q('exists', field='type_id')
    res = s.query(q)

    # 首次查询
    if after_key is None:
        composite_agg = A('composite', sources=[
            {'term': A('terms', field='type_id.keyword')}
        ], size=1000)
    else:
        composite_agg = A('composite', sources=[
            {'term': A('terms', field='type_id.keyword')}
        ], size=1000,after=after_key)

    res.aggs.bucket('gender_terms', composite_agg)

    # 执行查询并返回响应
    return res.execute()

4、调用上面的方法进行查询并获取桶

# 查询首页
response = fetch_case_num(startdate,orguuid)

# 获取查询数据桶
 buckets = response.aggregations.gender_terms.buckets

5、循环检查是否还有下一页,有的话执行查询方法追加到桶中

# 循环获取所有页
while after_key:
    print(after_key)
    response = fetch_case_num(startdate,orguuid,after_key)
    if response:
        buckets.extend(response.aggregations.gender_terms.buckets)
        after_key = response.aggregations.gender_terms.after_key if 'after_key' in response.aggregations.gender_terms else None
    else:
        break

6、处理数据

results = []
for bucket in buckets:
	result = {}
	result['cus_id'] = bucket.key['term']
	result['val_c'] = bucket.doc_count
	results.append(result)

ok,好了。主要是用到after_key,查看是否存在,接着after_key继续查询,如果没数据就说明到最后了。

分享到: