这是在工作中用到脚本,按设计思路是任何数据源都可以导入!
写的不太好,大佬莫介意!
drop table if exists `jdbc`;
CREATE TABLE if not exists `jdbc` (
`jdbc_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`jdbc_url` text COMMENT 'jdbc连接',
`user_name` text COMMENT '用户名',
`pass_word` text COMMENT '密码',
`state` int(11) DEFAULT '0' COMMENT '状态',
`desc_name` text COMMENT '注释',
`jdbc_type` varchar(20) DEFAULT NULL COMMENT 'jdbc类型',
PRIMARY KEY (`jdbc_id`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;
drop table if exists `tablename_pk`;
CREATE TABLE if not exists `tablename_pk` (
`table_name` varchar(50) NOT NULL COMMENT '表名',
`kudu_pk_name` varchar(500) DEFAULT NULL COMMENT 'kudu主键名',
`mysql_pk_name` varchar(500) DEFAULT NULL COMMENT 'mysql主键名',
`mysql_key_name` varchar(500) DEFAULT NULL COMMENT 'mysql key名',
`mysql_index_name` varchar(500) DEFAULT NULL COMMENT 'mysql 索引名',
PRIMARY KEY (`table_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
drop table if exists `oralce_special_field`;
CREATE TABLE if not exists `oralce_special_field` (
`table_name` varchar(100) NOT NULL COMMENT '表名',
`special_field` varchar(100) NOT NULL COMMENT '特殊字段',
`field_type` varchar(20) DEFAULT NULL COMMENT '字段类型',
PRIMARY KEY (`table_name`,`special_field`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
drop table if exists `import_type`;
CREATE TABLE if not exists `import_type` (
`import_type_id` int(11) NOT NULL DEFAULT '0' COMMENT '主键',
`tmp_table_name` text COMMENT '临时库名',
`desc_name` text COMMENT '注释',
PRIMARY KEY (`import_type_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
drop table if exists `import_str_sql`;
CREATE TABLE if not exists `import_str_sql` (
`str_sql_id` int(11) NOT NULL COMMENT '主键',
`str_sql` text COMMENT 'sql语句',
`desc_name` text COMMENT '注释',
PRIMARY KEY (`str_sql_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
drop table if exists `import_config_incr_prod`;
CREATE TABLE if not exists `import_config_incr_prod` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`import_type_id` int(11) NOT NULL DEFAULT '0' COMMENT '临时库id',
`data_source_jdbc_id` int(11) NOT NULL COMMENT '数据源jdbc id',
`data_source_name` text NOT NULL COMMENT '数据源表名',
`source_str_sql_id` int(11) DEFAULT NULL COMMENT '提取数据sql',
`special_field` int(11) DEFAULT '0' COMMENT '是否包含特殊字段',
`data_sink_jdbc_id` int(11) NOT NULL COMMENT '结果表jdbc id',
`data_sink_name` text NOT NULL COMMENT '结果表名',
`sink_str_sql_id` int(11) DEFAULT NULL COMMENT '结果表sql',
`field_type` varchar(20) NOT NULL DEFAULT 'kudu_field_type1' COMMENT '字段类型',
`shell_name` varchar(20) DEFAULT 'oracle_to_kudu' COMMENT '脚本名',
`desc_name` text COMMENT '注释',
`state` int(11) NOT NULL DEFAULT '0' COMMENT '状态',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=267 DEFAULT CHARSET=utf8;
drop table if exists `field_type`;
CREATE TABLE if not exists `field_type` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`table_name` varchar(100) NOT NULL COMMENT '表名',
`field_name` varchar(500) NOT NULL COMMENT '字段名',
`mysql_field_type` varchar(50) DEFAULT NULL COMMENT 'mysql字段类型',
`kudu_field_type1` varchar(50) DEFAULT NULL COMMENT 'kudu字段类型1(跟源表保持一致)',
`kudu_field_type2` varchar(50) DEFAULT NULL COMMENT 'kudu字段类型2',
`kudu_state` int(11) DEFAULT NULL COMMENT 'kudu表中状态',
`state` int(11) DEFAULT NULL COMMENT '状态',
PRIMARY KEY (`table_name`,`field_name`),
KEY `id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10314 DEFAULT CHARSET=utf8;
增量到处到大数据平台脚本 oracle_to_kudu_incr:
#!/bin/bash
#set -x
#执行多个sql文件
#export PGSQL_HOME=/usr/local/pgsql
#export MYSQL_HOME=/usr/local/mysql
export ORACLE_SID=orcl
export ORACLE_OWNER=oracle
export ORACLE_HOME=/home/batsom/instantclient_12_1/
export ORACLE_HOME_LISTNER=$ORACLE_HOME
PATH=$PATH:$HOME/bin:$ORACLE_HOME
PATH=$PATH:$HOME/bin:$MYSQL_HOME/bin
#PATH=$PATH:$HOME/bin:$PGSQL_HOME/bin
export PATH
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${ORACLE_HOME}
export NLS_LANG=AMERICAN_AMERICA.AL32UTF8
export HADOOP_USER_NAME=hdfs
export LANG=zh_CN.gbk
database_type="$1"
sd_date="$2"
############################启用脚本个数#######################
number=""
shell_name="oracle_to_kudu_incr"
sql_config="_data_import"
confile_file_dir="/home/batsom/shell"
shell_dir="/home/batsom/shell"
log_dir="${shell_dir}/log"
mkdir -p ${log_dir}
fun_lib_dir="/home/batsom/shell/inc"
out_file_dir="/home/batsom/out_file"
mkdir -p ${out_file_dir}
in_file_dir="/home/batsom/in_file"
mkdir -p ${in_file_dir}
exec_log="${log_dir}/${shell_name}${database_type}_exec.log"
err_log="${log_dir}/${shell_name}${database_type}_err.log"
############################自定义参数区#######################
#orgid="1"
#orgname="删除表"
###########################读取mysql配置#######################
mysql_config="${confile_file_dir}/readfile/mysql_config${sql_config}.txt"
mysql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
mysql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
mysql_dbname=`cat ${mysql_config}|awk -F '#' '{print $3}'`
mysql_user=`cat ${mysql_config}|awk -F '#' '{print $4}'`
mysql_passwd=`cat ${mysql_config}|awk -F '#' '{print $5}'`
###########################读取oracle配置######################
#oracle_config="${confile_file_dir}/readfile/oracle_config${sql_config}.txt"
#oracle_host=`cat ${oracle_config}|awk -F '#' '{print $1}'`
#oracle_port=`cat ${oracle_config}|awk -F '#' '{print $2}'`
#oracle_orcl=`cat ${oracle_config}|awk -F '#' '{print $3}'`
#oracle_user=`cat ${oracle_config}|awk -F '#' '{print $4}'`
#oracle_passwd=`cat ${oracle_config}|awk -F '#' '{print $5}'`
########################读取PostgreSQL配置#####################
#免密登录(将密码写在客户端服务器的用户家目录下,创建一个.pgpass文件,并将权限设置为0600,就可以实现了)
#文件的格式如下:
#hostname:port:database:username:password
###############################################################
#psql_config="${confile_file_dir}/readfile/psql_config${sql_config}.txt"
#psql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
#psql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
#psql_user=`cat ${mysql_config}|awk -F '#' '{print $3}'`
###############################################################
if test ! -f ${fun_lib_dir}/error_resolve.sh -o ! -f ${fun_lib_dir}/function.sh ;then
exit
fi
source ${fun_lib_dir}/error_resolve.sh
source ${fun_lib_dir}/function.sh
exec_num=`ps aux|grep "${confile_file_dir}/${shell_name}.sh"|grep "${database_type}"|grep -v "grep"|wc -l`
if test ${exec_num} -gt 4 ;then
exit
fi
start_time_s=`date +%s`
str_sql="SELECT CONCAT_WS('#',ic.id,it.tmp_table_name,ic.data_source_name,data_source_jdbc.jdbc_url,data_source_jdbc.user_name,data_source_jdbc.pass_word,iss1.str_sql,ic.data_sink_name,data_sink_jdbc.jdbc_url,data_sink_jdbc.user_name,data_sink_jdbc.pass_word,ic.field_type,ic.special_field,iss2.str_sql) AS return_txt FROM import_config_incr_${database_type} ic LEFT JOIN import_type it ON ic.import_type_id=it.import_type_id LEFT JOIN jdbc data_source_jdbc ON ic.data_source_jdbc_id=data_source_jdbc.jdbc_id LEFT JOIN import_str_sql iss1 ON ic.source_str_sql_id=iss1.str_sql_id LEFT JOIN jdbc data_sink_jdbc ON ic.data_sink_jdbc_id=data_sink_jdbc.jdbc_id LEFT JOIN import_str_sql iss2 ON ic.sink_str_sql_id=iss2.str_sql_id WHERE ic.state=1 AND ic.shell_name='oracle_to_kudu_incr' order by ic.id "
return_txt=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
if test -z "${return_txt}" ;then
exit
fi
while read line
do
#主键
id=`echo "${line}"|awk -F '#' '{print $1}'`
#hive临时库
hive_database=`echo "${line}"|awk -F '#' '{print $2}'`
#数据源表名
data_source_name=`echo "${line}"|awk -F '#' '{print $3}'| tr a-z A-Z`
#数据源jdbc
data_source_jdbc=`echo "${line}"|awk -F '#' '{print $4}'`
#数据源用户名
source_user_name=`echo "${line}"|awk -F '#' '{print $5}'`
#数据源密码
source_pass_word=`echo "${line}"|awk -F '#' '{print $6}'`
#数据源sql
source_str_sql=`echo "${line}"|awk -F '#' '{print $7}'`
#结果源表名
data_sink_name=`echo "${line}"|awk -F '#' '{print $8}'`
#结果源jdbc
data_sink_jdbc=`echo "${line}"|awk -F '#' '{print $9}'`
#结果源用户名
sink_user_name=`echo "${line}"|awk -F '#' '{print $10}'`
#结果源密码
sink_pass_word=`echo "${line}"|awk -F '#' '{print $11}'`
#字段类型
field_type_name=`echo "${line}"|awk -F '#' '{print $12}'`
#是否包含特殊字段
special_field=`echo "${line}"|awk -F '#' '{print $13}'`
#结果源sql
sink_str_sql=`echo "${line}"|awk -F '#' '{print $14}'`
sink_database=`echo "${data_sink_name}"|awk -F '.' '{print $1}'`
sink_tablename=`echo "${data_sink_name}"|awk -F '.' '{print $2}'`
hadoop fs -rmr hdfs://nameservice1/user/hdfs/${data_source_name}
str_sql="SELECT count(*) as txt FROM field_type WHERE UPPER(table_name)=UPPER('${data_source_name}') AND state=1 ORDER BY id"
echo "${str_sql}"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
if test "${return_txt_tmp}" = "0" ;then
echo "insert table field"
/usr/bin/sqoop import --hive-import --connect jdbc:oracle:thin:@${data_source_jdbc} --username ${source_user_name} --password ${source_pass_word} --table ${data_source_name} --hive-database ${hive_database} --hive-table ${sink_tablename} --hive-drop-import-delims --null-string '\\N' --null-non-string '\\N' --hive-drop-import-delims ${map_column_hive} -m 1 --where "1=0"
mv -f ${data_source_name}.java ${log_dir}/
str_sql="invalidate metadata ${hive_database}.${sink_tablename}"
echo "${str_sql}"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
str_sql="DESCRIBE ${hive_database}.${sink_tablename}"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}" -B --output_delimiter=","|awk -F ',' -v table_name="${data_source_name}" '{print table_name","$1","$2",string,2"}' > ${out_file_dir}/${sink_tablename}.csv
return_txt=`mysqlload "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${out_file_dir}/${sink_tablename}.csv" "data_import.field_type" "," "(table_name,field_name,kudu_field_type1,kudu_field_type2,state)"`
mv -f ${out_file_dir}/${sink_tablename}.csv /tmp
str_sql="update data_import.import_config_${database_type} set state=2 where id=${id}"
echo "${str_sql}"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
continue
fi
str_sql="drop table if exists ${hive_database}.${sink_tablename}_${database_type}"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
i=0
if test "${special_field}" = "1" ;then
str_sql="SELECT GROUP_CONCAT(UPPER(special_field),'=',field_type) txt FROM data_import.oralce_special_field WHERE UPPER(table_name)=UPPER('${data_source_name}') GROUP BY table_name"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
map_column_hive="--map-column-hive ${return_txt_tmp}"
else
map_column_hive=""
fi
str_sql="drop table if exists ${hive_database}.${sink_tablename}_${database_type}"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
i=0
if test "${special_field}" = "1" ;then
str_sql="SELECT GROUP_CONCAT(UPPER(special_field),'=',field_type) txt FROM oralce_special_field WHERE UPPER(table_name)=UPPER('${data_source_name}') GROUP BY table_name"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
map_column_hive="--map-column-hive ${return_txt_tmp}"
else
map_column_hive=""
fi
str_sql="SELECT kudu_pk_name FROM tablename_pk WHERE UPPER(table_name)=UPPER('${data_source_name}')"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
table_pk_tmp="${return_txt_tmp}"
table_pk_notin=""
create_kudu_field1=""
create_hive_field1=""
create_hive_field_and_type=""
array_field=(${table_pk_tmp//,/ })
table_pk_one="${array_field[0]}"
for LINE in "${array_field[@]}"
do
if test ${i} -eq 0 ;then
#create_hive_field=`echo "${create_hive_field}"|sed "s# ${LINE} ##g"`
table_pk_notin="${table_pk_notin}\"${LINE}\""
create_kudu_field1="${create_kudu_field1}cast(\`${LINE}\` as string) as \`${LINE}\`"
create_hive_field1="${create_hive_field1}\`${LINE}\` string"
i=`expr ${i} + 1`
else
table_pk_notin="${table_pk_notin},\"${LINE}\""
create_kudu_field1="${create_kudu_field1},cast(\`${LINE}\` as string) as \`${LINE}\`"
create_hive_field1="${create_hive_field1},\`${LINE}\` string"
fi
done
table_pk=`echo "${table_pk_notin}"|sed 's#\"#\`#g'`
echo "${table_pk}"
i=0
create_kudu_field2=""
import_feild_name=""
str_sql="SELECT GROUP_CONCAT(upper(field_name) ORDER BY id) as txt FROM field_type WHERE UPPER(table_name)=UPPER('${data_source_name}') AND state=1"
echo "${str_sql}"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
import_feild_name="${return_txt_tmp}"
str_sql="SELECT CONCAT_WS('#',field_name,kudu_field_type1,${field_type_name}) as txt FROM field_type WHERE UPPER(table_name)=UPPER('${data_source_name}') AND field_name not in (${table_pk_notin}) AND state=1 ORDER BY id"
echo "${str_sql}"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
while read line_tmp
do
#echo "${line_tmp}"
table_field_name=`echo "${line_tmp}"|awk -F '#' '{print $1}'| tr a-z A-Z`
table_field_type1=`echo "${line_tmp}"|awk -F '#' '{print $2}'|sed "s/\r//"`
table_field_type=`echo "${line_tmp}"|awk -F '#' '{print $3}'|sed "s/\r//"`
#kudu_state=`echo "${line_tmp}"|awk -F '#' '{print $4}'`
if test ${i} -eq 0 ;then
create_kudu_field2="${create_kudu_field2}\`${table_field_name}\` "
create_hive_field_and_type="${create_hive_field_and_type}\`${table_field_name}\` ${table_field_type} "
i=`expr ${i} + 1`
else
create_kudu_field2="${create_kudu_field2},\`${table_field_name}\` "
create_hive_field_and_type="${create_hive_field_and_type},\`${table_field_name}\` ${table_field_type} "
fi
done <<< "${return_txt_tmp}"
upsert_kudu_field="${table_pk_tmp},${create_kudu_field2}"
create_kudu_field="${create_kudu_field1},${create_kudu_field2}"
str_sql="create table if not exists ${hive_database}.${sink_tablename}_${database_type}(${create_hive_field1},${create_hive_field_and_type}) stored as parquet"
echo "${str_sql}"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
echo "`date +%Y-%m-%d\ %H:%M`---${data_sink_name}---" >> ${out_file_dir}/${shell_name}_${database_type}.log
str_sql="select count(*) as text from ${data_source_name}"
oracle_count=`oracletxt "${data_source_jdbc}" "${source_user_name}" "${source_pass_word}" "${str_sql}"`
echo "oracle : ${oracle_count}" >> ${out_file_dir}/${shell_name}_${database_type}.log
str_sql=`echo "${source_str_sql}"|sed "s#\[replace_data_sink_table_name\]#${data_source_name}#g"|sed "s#\[replace_data_source_table_name\]#${hive_database}.${sink_tablename}_${database_type}#g"|sed "s#\[replace_table_field\]#${import_feild_name}#g"|sed "s#\[table_pk\]#${table_pk}#g"|sed "s#\[table_field_name\]#${table_pk_one}#g"|sed "s#\[sd_date\]#${sd_date}#g"`
echo "${str_sql}" >> ${exec_log}
/usr/bin/sqoop import --connect jdbc:oracle:thin:@${data_source_jdbc} --username ${source_user_name} --password ${source_pass_word} --hcatalog-database ${hive_database} --hcatalog-table ${sink_tablename}_${database_type} --null-string '\\N' --null-non-string '\\N' -m 1 --query "${str_sql}"
error=`echo $?`
echo "/usr/bin/sqoop import --connect jdbc:oracle:thin:@${data_source_jdbc} --username ${source_user_name} --password ${source_pass_word} --hcatalog-database ${hive_database} --hcatalog-table ${sink_tablename}_${database_type} --null-string '\\N' --null-non-string '\\N' -m 1 --query \"${str_sql}\""
if test ${error} -ne 0 ;then
continue
fi
mv -f QueryResult.java ${log_dir}/
str_sql="invalidate metadata ${hive_database}.${sink_tablename}_${database_type};refresh ${hive_database}.${sink_tablename}_${database_type};"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
data_sink_name_tmp="${sink_database}.`echo "${sink_tablename}"| tr a-z A-Z`"
str_sql=`echo "${sink_str_sql}"|sed "s#\[replace_data_sink_table_name\]#${data_sink_name_tmp}#g"|sed "s#\[replace_data_source_table_name\]#${hive_database}.${sink_tablename}_${database_type}#g"|sed "s#\[database_type\]#_${database_type}#g"|sed "s#\[replace_table_field1\]#${upsert_kudu_field}#g"|sed "s#\[replace_table_field2\]#${create_kudu_field}#g"|sed "s#\[replace_table_field\]#${create_kudu_field}#g"|sed "s#\[replace_table_field\]#${import_feild_name}#g"|sed "s#\[table_pk\]#${table_pk}#g"|sed "s#\[table_field_name\]#${table_pk_one}#g"|sed "s#\[sd_date\]#${sd_date}#g"|sed "s#\[replace_sink_tablename\]#${data_source_name}#"`
echo "${str_sql}"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
str_sql="invalidate metadata ${data_sink_name};refresh ${data_sink_name};compute stats ${data_sink_name};"
/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}"
str_sql="select count(*) from ${data_sink_name}"
impala_count=`/usr/bin/impala-shell -i "${data_sink_jdbc}" -d "${hive_database}" -q "${str_sql}" -B`
echo "impala : ${impala_count}" >> ${out_file_dir}/${shell_name}_${database_type}.log
str_sql="update import_config_incr_${database_type} set state=0 where id=${id}"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
done <<< "${return_txt}"
end_time_s=`date +%s`
spent_s=`expr ${end_time_s} - ${start_time_s} `
echo "${shell_name}.sh `date +%Y-%m-%d\ %H:%M` 处理完毕,耗时:${spent_s}" >> ${exec_log}
#set +x
增量同步到mysql hive_to_mysql_incr_v4.sh:
#!/bin/bash
#set -x
#执行多个sql文件
#export PGSQL_HOME=/usr/local/pgsql
export MYSQL_HOME=/usr/local/mysql
#export ORACLE_SID=orcl
#export ORACLE_OWNER=oracle
#export ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
#export ORACLE_BASE=/u01/app/oracle
#export ORACLE_HOME_LISTNER=$ORACLE_HOME
#PATH=$PATH:$HOME/bin:$ORACLE_HOME/bin
PATH=$PATH:$HOME/bin:$MYSQL_HOME/bin
#PATH=$PATH:$HOME/bin:$PGSQL_HOME/bin
export PATH
#export LD_LIBRARY_PATH="${ORACLE_HOME}/lib"
#export NLS_LANG=AMERICAN_AMERICA.AL32UTF8
export HADOOP_USER_NAME=hdfs
#export LANG=zh_CN.gbk
data_source_name="$1"
database_type="$2"
if test "${database_type}" = "dwd_data_prod" ;then
mysql_database="uzt"
else
mysql_database="${database_type}"
fi
mysql_type=`echo "${database_type}"|sed "s#dwd_data##g"`
############################启用脚本个数#######################
number=""
shell_name="hive_to_mysql_incr_v4"
sql_config="_data_import"
confile_file_dir="/home/batsom/shell"
shell_dir="/home/batsom/shell"
log_dir="${shell_dir}/log"
mkdir -p ${log_dir}
fun_lib_dir="/home/batsom/shell/inc"
out_file_dir="/home/batsom/out_file"
mkdir -p ${out_file_dir}
in_file_dir="/home/batsom/in_file"
mkdir -p ${in_file_dir}
exec_log="${log_dir}/${shell_name}${mysql_type}_exec.log"
err_log="${log_dir}/${shell_name}${mysql_type}_err.log"
############################自定义参数区#######################
#orgid="1"
#orgname="删除表"
###########################读取mysql配置#######################
mysql_config="${confile_file_dir}/readfile/mysql_config${sql_config}.txt"
mysql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
mysql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
mysql_dbname=`cat ${mysql_config}|awk -F '#' '{print $3}'`
mysql_user=`cat ${mysql_config}|awk -F '#' '{print $4}'`
mysql_passwd=`cat ${mysql_config}|awk -F '#' '{print $5}'`
###########################读取oracle配置######################
#oracle_config="${confile_file_dir}/readfile/oracle_config${sql_config}.txt"
#oracle_host=`cat ${oracle_config}|awk -F '#' '{print $1}'`
#oracle_port=`cat ${oracle_config}|awk -F '#' '{print $2}'`
#oracle_orcl=`cat ${oracle_config}|awk -F '#' '{print $3}'`
#oracle_user=`cat ${oracle_config}|awk -F '#' '{print $4}'`
#oracle_passwd=`cat ${oracle_config}|awk -F '#' '{print $5}'`
########################读取PostgreSQL配置#####################
#免密登录(将密码写在客户端服务器的用户家目录下,创建一个.pgpass文件,并将权限设置为0600,就可以实现了)
#文件的格式如下:
#hostname:port:database:username:password
###############################################################
#psql_config="${confile_file_dir}/readfile/psql_config${sql_config}.txt"
#psql_host=`cat ${mysql_config}|awk -F '#' '{print $1}'`
#psql_port=`cat ${mysql_config}|awk -F '#' '{print $2}'`
#psql_user=`cat ${mysql_config}|awk -F '#' '{print $3}'`
###############################################################
if test ! -f ${fun_lib_dir}/error_resolve.sh -o ! -f ${fun_lib_dir}/function.sh ;then
exit
fi
source ${fun_lib_dir}/error_resolve.sh
source ${fun_lib_dir}/function.sh
exec_num=`ps aux|grep "${confile_file_dir}/${shell_name}.sh"|grep "${mysql_type}"|grep -v "grep"|wc -l`
#if test ${exec_num} -gt 4 ;then
# exit
#fi
start_time_s=`date +%s`
str_sql="SELECT CONCAT_WS('#',ic.id,it.tmp_table_name,ic.data_source_name,data_source_jdbc.jdbc_url,data_source_jdbc.user_name,data_source_jdbc.pass_word,iss1.str_sql,ic.data_sink_name,data_sink_jdbc.jdbc_url,data_sink_jdbc.user_name,data_sink_jdbc.pass_word,ic.field_type,ic.special_field,iss2.str_sql) AS return_txt FROM import_config_incr${mysql_type} ic LEFT JOIN import_type it ON ic.import_type_id=it.import_type_id LEFT JOIN jdbc data_source_jdbc ON ic.data_source_jdbc_id=data_source_jdbc.jdbc_id LEFT JOIN import_str_sql iss1 ON ic.source_str_sql_id=iss1.str_sql_id LEFT JOIN jdbc data_sink_jdbc ON ic.data_sink_jdbc_id=data_sink_jdbc.jdbc_id LEFT JOIN import_str_sql iss2 ON ic.sink_str_sql_id=iss2.str_sql_id WHERE ic.state=1 AND ic.shell_name='hive_to_mysql_incr' and ic.data_source_name='${data_source_name}' "
return_txt=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
if test -z "${return_txt}" ;then
exit
fi
while read line
do
#主键
id=`echo "${line}"|awk -F '#' '{print $1}'`
#hive临时库
hive_database=`echo "${line}"|awk -F '#' '{print $2}'`
#数据源表名
data_source_name=`echo "${line}"|awk -F '#' '{print $3}'| tr a-z A-Z`
data_source_name_tmp=`echo "${line}"|awk -F '#' '{print $3}'|awk -F '.' '{print $2}'`
#数据源jdbc
data_source_jdbc=`echo "${line}"|awk -F '#' '{print $4}'`
#数据源用户名
source_user_name=`echo "${line}"|awk -F '#' '{print $5}'`
#数据源密码
source_pass_word=`echo "${line}"|awk -F '#' '{print $6}'`
#数据源sql
source_str_sql=`echo "${line}"|awk -F '#' '{print $7}'`
#结果源表名
data_sink_name=`echo "${line}"|awk -F '#' '{print $8}'`
data_sink_name_tmp=`echo "${line}"|awk -F '#' '{print $8}'|awk -F '.' '{print $2}'`
#结果源jdbc
data_sink_jdbc=`echo "${line}"|awk -F '#' '{print $9}'`
#结果源用户名
sink_user_name=`echo "${line}"|awk -F '#' '{print $10}'`
#结果源密码
sink_pass_word=`echo "${line}"|awk -F '#' '{print $11}'`
#字段类型
field_type_name=`echo "${line}"|awk -F '#' '{print $12}'`
#是否包含特殊字段
special_field=`echo "${line}"|awk -F '#' '{print $13}'`
#结果源sql
sink_str_sql=`echo "${line}"|awk -F '#' '{print $14}'`
source_database=`echo "${data_source_name}"|awk -F '.' '{print $1}'`
source_tablename=`echo "${data_source_name}"|awk -F '.' '{print $2}'`
sink_database=`echo "${data_sink_name}"|awk -F '.' '{print $1}'`
sink_tablename=`echo "${data_sink_name}"|awk -F '.' '{print $2}'`
data_sink_jdbc_ip=`echo "${data_sink_jdbc}" |awk -F '/' '{print $1}'`
data_sink_jdbc="${data_sink_jdbc_ip}/${mysql_database}"
i=0
j=0
create_mysql_field_and_type=""
create_mysql_field=""
mysql_index=""
str_sql="SELECT CONCAT_WS('#',field_name,mysql_field_type) as txt FROM field_type WHERE UPPER(table_name)=UPPER('${sink_tablename}') AND state=1 ORDER BY id"
echo "${str_sql}"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
while read line_tmp
do
#echo "${line_tmp}"
table_field_name=`echo "${line_tmp}"|awk -F '#' '{print $1}'`
table_field_type=`echo "${line_tmp}"|awk -F '#' '{print $2}'|sed "s/\r//"`
if test "${table_field_name}" = "" -o "${table_field_type}" = "" ;then
echo "insert table field"
export_file_dir="${out_file_dir}/${shell_name}_${data_source_name}.csv"
str_sql="SELECT CONCAT_WS(',',table_name,COLUMN_NAME,COLUMN_TYPE,'string','2') txt FROM INFORMATION_SCHEMA.COLUMNS t WHERE upper(TABLE_NAME)=upper('${sink_tablename}') AND TABLE_SCHEMA='dwd_data' ORDER BY ORDINAL_POSITION"
return_txt=`mysqlout "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}" "${export_file_dir}.tmp"`
mv ${export_file_dir}.tmp ${export_file_dir}
return_txt=`mysqlload "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${export_file_dir}" "data_import.field_type" "," "(table_name,field_name,mysql_field_type,kudu_field_type2,state)"`
mv -f ${export_file_dir} /tmp
j=`expr ${j} + 1`
fi
if test ${i} -eq 0 ;then
create_mysql_field_and_type="${create_mysql_field_and_type}${table_field_name} ${table_field_type} "
create_mysql_field="${create_mysql_field}${table_field_name} "
i=`expr ${i} + 1`
else
create_mysql_field_and_type="${create_mysql_field_and_type},${table_field_name} ${table_field_type} "
create_mysql_field="${create_mysql_field},${table_field_name} "
fi
done <<< "${return_txt_tmp}"
#echo "------------------${create_mysql_field_and_type}--------------"
#echo "-----------------------${create_mysql_field}------------------"
echo "--`date +%Y-%m-%d\ %H:%M`-${data_source_name}---" >> ${out_file_dir}/${shell_name}${mysql_type}.log
if test ${j} != "0" ;then
continue
fi
str_sql=`echo "${source_str_sql}"|sed "s#\[replace_data_sink_table_name\]#${hive_database}.${source_tablename}${mysql_type}#g"|sed "s#\[replace_data_source_table_name\]#${database_type}.${source_tablename}#g"|sed "s#\[replace_table_field\]#${create_mysql_field}#g"`
echo "${str_sql}"
/usr/bin/impala-shell -i "${data_source_jdbc}" -d "${source_database}" -q "${str_sql}"
str_sql="invalidate metadata ${hive_database}.${source_tablename}${mysql_type};refresh ${hive_database}.${source_tablename}${mysql_type};"
echo "${str_sql}"
/usr/bin/impala-shell -i "${data_source_jdbc}" -d "${source_database}" -q "${str_sql}"
str_sql="select count(*) from ${database_type}.${source_tablename}"
return_txt_tmp=`/usr/bin/impala-shell -i "${data_source_jdbc}" -d "${source_database}" -q "${str_sql}" -B`
echo "impala : ${return_txt_tmp}" >> ${out_file_dir}/${shell_name}${mysql_type}.log
str_sql="SELECT CONCAT_WS('#',mysql_pk_name,IFNULL(mysql_key_name,''),IFNULL(mysql_index_name,'')) txt FROM tablename_pk WHERE UPPER(table_name)=UPPER('${sink_tablename}')"
return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
mysql_pk_name=`echo "${return_txt_tmp}"|awk -F '#' '{print $1}'`
mysql_key_name=`echo "${return_txt_tmp}"|awk -F '#' '{print $2}'`
mysql_index_name=`echo "${return_txt_tmp}"|awk -F '#' '{print $3}'`
if test "${mysql_pk_name}" ;then
mysql_pkey=",PRIMARY KEY (${mysql_pk_name})"
else
continue
fi
if test "${mysql_key_name}" ;then
mysql_key=",KEY ${sink_tablename} (${mysql_key_name})"
else
mysql_key=""
fi
if test "${mysql_index_name}" ;then
i=0
array_field=(${mysql_index_name//,/ })
for LINE in "${array_field[@]}"
do
#echo "------------------"
echo "${LINE}"
if test ${i} -eq 0 ;then
mysql_index=",${mysql_index} index ${sink_tablename}_${LINE}(${LINE})"
i=`expr ${i} + 1`
else
mysql_index="${mysql_index},index ${sink_tablename}_${LINE}(${LINE})"
fi
done
else
mysql_index=""
fi
mysql_load_dbname=${data_sink_jdbc##*/}
jdbc_url_tmp=${data_sink_jdbc%/*}
mysql_host_port_tmp=${jdbc_url_tmp##*/}
mysql_load_host=`echo "${mysql_host_port_tmp}"|awk -F ':' '{print $1}'`
mysql_load_port=`echo "${mysql_host_port_tmp}"|awk -F ':' '{print $2}'`
mysql_load_user=${sink_user_name}
mysql_load_passwd=${sink_pass_word}
#str_sql="drop table if exists ${data_sink_name};create table ${data_sink_name}(${create_mysql_field_and_type} ${mysql_pkey} ${mysql_key} ${mysql_index}) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 ;"
str_sql=`echo "${sink_str_sql}"|sed "s#\[data_sink_name\]#${data_sink_name}#g"|sed "s#\[create_mysql_field_and_type\]#${create_mysql_field_and_type}#g"|sed "s#\[mysql_pkey\]#${mysql_pkey}#g"|sed "s#\[mysql_key\]#${mysql_key}#g"|sed "s#\[mysql_index\]#${mysql_index}#g"`
echo "${str_sql}"
return_txt_tmp=`mysqltxt "${mysql_load_host}" "${mysql_load_port}" "${mysql_load_user}" "${mysql_load_passwd}" "${mysql_load_dbname}" "${str_sql}"`
/usr/bin/sqoop export -Dmapreduce.map.memory.mb=2049 --connect jdbc:mysql://${data_sink_jdbc}?useUnicode=true --username ${sink_user_name} --password ${sink_pass_word} --table ${sink_tablename} --update-key ${mysql_pk_name} --update-mode allowinsert --hcatalog-database ${hive_database} --hcatalog-table ${source_tablename}${mysql_type}
mv -f ${sink_tablename}.java ${log_dir}/
echo "/usr/bin/sqoop export -Dmapreduce.map.memory.mb=2049 --connect jdbc:mysql://${data_sink_jdbc}?useUnicode=true --username ${sink_user_name} --password ${sink_pass_word} --table ${sink_tablename} --update-key ${mysql_pk_name} --update-mode allowinsert --hcatalog-database ${hive_database} --hcatalog-table ${source_tablename}${mysql_type}"
str_sql="select count(*) from ${data_sink_name}"
return_txt_tmp=`mysqltxt "${mysql_load_host}" "${mysql_load_port}" "${mysql_load_user}" "${mysql_load_passwd}" "${mysql_load_dbname}" "${str_sql}"`
echo "mysql : ${return_txt_tmp}" >> ${out_file_dir}/${shell_name}${mysql_type}.log
str_sql="update import_config_incr${mysql_type} set state=0 where id=${id}"
#return_txt_tmp=`mysqltxt "${mysql_host}" "${mysql_port}" "${mysql_user}" "${mysql_passwd}" "${mysql_dbname}" "${str_sql}"`
done <<< "${return_txt}"
end_time_s=`date +%s`
spent_s=`expr ${end_time_s} - ${start_time_s} `
echo "${shell_name}.sh `date +%Y-%m-%d\ %H:%M` 处理完毕,耗时:${spent_s}" >> ${exec_log}
#set +x