seatunnel各类型数据库同步配置
我是在dolphinscheduler调度器中使用seatunnel,在资源中心中创建对应的同步配置文件。
一、DB2同步数据到MySQL
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:db2://127.0.0.18:50000/db"
driver = "com.ibm.db2.jcc.DB2Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "select * from table"
}
}
transform {
}
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
generate_sink_sql = true
database = "test"
table = "table"
primary_keys = ["id"]
}
}
二、SQL Server同步数据到MySQL
env {
job.mode = "BATCH"
checkpoint.interval = 300000
execution.parallelism = 4
read_limit.bytes_per_second=100000000
read_limit.rows_per_second=200000
}
source {
Jdbc {
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
url = "jdbc:sqlserver://127.0.0.1:1433;databaseName=testdb"
user = "user"
password = "password"
query = "select * from table"
}
}
transform {
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
database = "test"
generate_sink_sql = true
table = "table"
batch_size = 10000
}
}
三、MySQL同步到MySQL,字段转换
env {
parallelism = 2
job.mode = "BATCH"
}
source{
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query="SELECT case_uuid,org_uuid,org_short_name,case_name,case_place,party FROM tablename"
result_table_name = table1
}
}
transform {
FieldMapper {
"result_table_name"=table2
"source_table_name"=table1
"field_mapper" {
"case_uuid"="id"
"org_uuid"="case_unit_id"
"org_short_name"="case_unit"
"case_name"="register_number"
"case_place"="check_address"
"party"="legal"
}
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/testdb?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
generate_sink_sql = true
database = "testdb"
table = "tablename"
batch_size = 1000
max_commit_attempts = 3
transaction_timeout_sec = -1
max_retries = 5
auto_commit = "true"
support_upsert_by_query_primary_key_exist = "true"
primary_keys = ["id"]
source_table_name = table2
}
}
四、postgresql同步到postgressql
env {
parallelism = 2
job.mode = "BATCH"
}
source{
jdbc {
url = "jdbc:postgresql://127.0.0.1:5432/gpdb"
driver = "org.postgresql.Driver"
user = "user"
password = "password"
query="SELECT * FROM public.test where coltime > current_date - INTERVAL '30 day' "
}
}
transform {
}
sink {
jdbc {
url = "jdbc:postgresql://192.168.1.31:5432/gpdb"
driver = "org.postgresql.Driver"
user = "user"
password = "password"
connection_check_timeout_sec = 100
table = "public.test"
generate_sink_sql = true
database = "gpdb"
primary_keys = ["id"]
}
}
五、postgresql同步到Es
env {
job.mode = "BATCH"
}
source{
jdbc {
url = "jdbc:postgresql://192.168.1.2:5432/gpdb"
driver = "org.postgresql.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "select * from public.test where date > current_date - INTERVAL '30 day'"
fetch_size = 100000
parallelism = 8
}
}
transform {
}
sink {
Elasticsearch {
hosts = ["192.168.1.6:9210"]
index = "testindex"
primary_keys = ["id"]
}
}