按pg数据库表结构自动生成Es索引及映射

9个月前 (08-15)学习357

环境说明

我使用的pg是16.4版本,Es是7.10版本,python3.10

PostgreSQL 16.4, compiled by Visual C++ build 1940, 64-bit

1、安装psycopg2和elasticsearch库

pip install psycopg2

pip install elasticsearch==7.0

装最新版本的elasticsearch库会报错以下错误,最后试的7.0版本可以用。

ImportError: cannot import name ‘Mapping‘ from ‘collections‘

2、编写生成代码

import psycopg2  # 导入psycopg2驱动程序
from elasticsearch import Elasticsearch

# 连接到Elasticsearch
es = Elasticsearch(["http://127.0.0.1:9210"])
index_mapping = {
    'mappings': {
        'properties': {}
    }
}

def getcolumns(table):
    # 创建数据库连接
    cnx = psycopg2.connect(
        host='127.0.0.1',
        port='5432',
        database='gpdb',
        user='postgres',
        password='password'
    )

    # 创建游标
    cursor = cnx.cursor()
    # 执行查询语句
    query = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table}'"
    cursor.execute(query)

    # 获取结果
    result = cursor.fetchall()

    # 关闭游标和连接
    cursor.close()
    return result

def get_es_type(data_type):
    if data_type == 'integer':
        return 'integer'
    elif data_type == 'bigint':
        return 'long'
    elif data_type == 'numeric':
        return 'float'
    elif data_type == 'character varying' or data_type == 'text':
        return 'text'
    elif data_type == 'boolean':
        return 'boolean'
    elif data_type == 'timestamp with time zone' or data_type == 'timestamp without time zone':
        return 'date'
    elif data_type == 'bytea':
        return 'binary'
    else:
        return 'keyword'  # 默认使用keyword类型


# 创建Es索引
def create_index(table_name, index_name):
    result = getcolumns(table_name)

    # 添加字段映射
    for column_name, data_type in result:
        es_type = get_es_type(data_type)
        index_mapping['mappings']['properties'][column_name] = {'type': es_type}

    # 使用indices.exists()方法判断Index是否存在
    if not es.indices.exists(index=index_name):
        es.indices.create(index=index_name, body=index_mapping)
        print(f'索引{index_name}创建成功。')
    else:
        print(f'索引{index_name}已存在,无需创建。')

# 需要创建索引的表
indexlist = [
    {
        'table_name': 'tablename',
        'index_name': 'indexname'
    }
]

for indexinfo in indexlist:
    table_name = indexinfo['table_name']
    index_name = indexinfo['index_name']
    create_index(table_name, index_name)

记得修改连接和表名。

3、执行代码,完事。

分享到: