iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >sqoop脚本批量生成
  • 826
分享到

sqoop脚本批量生成

批量脚本sqoop 2023-01-31 07:01:04 826人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

通过all_tab_columnss字典表生成Hive的建表语句 create or replace view create_sql as--通过all_tab_columnss字典表生成hive的建表语句select own

  • 通过all_tab_columnss字典表生成Hive的建表语句

create or replace view create_sql as
--通过all_tab_columnss字典表生成hive的建表语句
select owner,table_name, case

     when nm = 1 then
      'create table ' || owner || '.' || TABLE_NAME || ' (' ||
      COLUMN_NAME || ' ' || DATA_TYPE || ','
     when np = 1 then
      COLUMN_NAME || ' ' || DATA_TYPE || ') partitioned by (dt string);'
     else
      COLUMN_NAME || ' ' || DATA_TYPE || ','
   end create_sql

from (

    SELECT OWNER,
            TABLE_NAME,
            COLUMN_NAME,
            CASE
              WHEN DATA_TYPE IN ('VARCHAR2','LONG','CLOB','CHAR','NCHAR','BLOB','VARCHAR2') THEN
               'STRING'
              WHEN DATA_TYPE IN ('NUMBER') then
               'DECIMAL' || '(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
              WHEN DATA_TYPE IN ('DATE', 'TIMESTAMP(6)') THEN
               'STRING'
              ELSE
               DATA_TYPE||'(' || DATA_PRECISION || ',' || DATA_SCALE || ')'
            END DATA_TYPE,
            COLUMN_ID,
            NM,
            NP
      FROM (select t.*,
                    row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID) nm,
                    row_number() over(partition by owner, TABLE_NAME order by COLUMN_ID desc) np
             
               from all_tab_columns t
              ))

ORDER BY OWNER, TABLE_NAME, COLUMN_ID;


二、生成sqoop任务的配置表
select
owner||table_name job_name, --任务名称
'' table_type, --表类型dim 维表,fact_i 只增,Fact_iud 增删改
'' partition, --hive表是否是分区表 0 非,1 是
'BS' source_db, --来源业务系统,一般用源表用户名或库名
owner||'.'||table_name source_table, --源表名称
'' datatime_cloumn, --增量时间戳字段
'APPEND' incremental, --增量接入方式,append:增量接入,overwrite:全量接入
'' SPLIT_BY, --并行字段,选择重复数据较少的字段
'APP_NO' row_key, --主键字段
'fz_bs' hive_db, --指定接入的hive库
table_name hive_table, --指定接入的hive表,必须是无数据库名的存表名
'' check_column, --指定接入的源表字段
columns
from (
select owner,table_name,wm_concat(column_name)over(partition by owner,table_name order by column_id) columns,rn from (
select owner,table_name,column_name,column_id,row_number()over(partition by owner,table_name order by column_id desc) rn from all_tab_column
where owner||'_'||table_name in(
'BS_S_FAULT_RPT',
'BS_ARC_S_FAULT_RPT',
'HIS_ARC_S_FAULT_RPT',
'BS_S_FAULT_HANDLE',
'BS_ARC_S_FAULT_HANDLE',
'HIS_ARC_S_FAULT_HANDLE',
'BS_S_RETVISIT',
'BS_ARC_S_RETVISIT',
'HIS_ARC_S_RETVISIT',
'BS_S_95598_WKST_RELA',
'HIS_S_95598_WKST_RELA')
order by owner,table_name,column_id
))
where rn=1
order by owner,table_name;


  • 下面是自动生成sqoop配置任务的的python脚本

import JSON,os
def json_make(input_file='./tablelist'):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hive-bin')
    os.system('mkdir  sqoopjob/impala-bin')
    os.system('mkdir  sqoopjob/partition-bin')
    os.system('mkdir  sqoopjob/job-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --passWord Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
hcf = open('./sqoopjob/hql_corntab.cron', 'w')
imcf=open('./sqoopjob/iMQl_corntab.cron', 'w')
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
#scmd=open('./sqoopjob/sqoopcmd.sh', 'w')
#scmd.write('''yesdate=`date -d last-day +%Y-%m-%d`;todday=`date +%Y-%m-%d`''')
#cf=open('./sqoopjob/cron.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            hivetruncate='''hive -e"use {hive_db};truncate table {hive_table}_new"\n'''.fORMat(hive_db=job['hive_db'],hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-drop-import-delims \\
                    --hive-overwrite \\
                    --hive-table {hive_db}.{hive_table}_NEW \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    hive_db=job['hive_db'],
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper()
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            hql = '''hive -e "use {hive_db};
                create table {tmp_table} as
                select {columns} from
                (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                from (select * from {table_name}_new uNIOn all select * from {table_name}) t
                ) tm
                where rn =1;
                drop table {table_name};
                alter table {table_name}_TMP rename to {table_name};
                "\n'''.format(
                                    hive_db=job['hive_db'],
                                    tmp_table=job['source_table'].replace('.', '_')+'_tmp',
                                    columns=job['columns'],
                                    pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                    order_by=job['datatime_cloumn'],
                                    table_name=job['source_table'].replace('.', '_'),
                                    ).replace('    ','')
            imql = '''impala-shell -i bigdata-w-001 -q "invalidate metadata;use {hive_db};
                create table {tmp_table} as
                select {columns} from
                (select t.*,row_number()over(partition by {pattition_by} ORDER BY  t.{order_by} desc) as rn
                from (select * from {table_name}_new union all select * from {table_name}) t
                ) tm
                where rn =1;
                drop table {table_name};
                alter table {table_name}_TMP rename to {table_name};
                "\n'''.format(
                                    hive_db=job['hive_db'],
                                    tmp_table=job['source_table'].replace('.', '_') + '_tmp',
                                    columns=job['columns'],
                                    pattition_by=','.join(['t.' + cl for cl in job['pattition_by'].split(',')]),
                                    order_by=job['datatime_cloumn'],
                                    table_name=job['source_table'].replace('.', '_'),
                                ).replace('    ','')
            #print(sjf)
            sjf.write(hivetruncate+createjob)
            df.write(deledrop)
            open('./sqoopjob/hive-bin/' + job['job_name'].lower() + '_hql.sh', 'w').write(kerboros + execjob  + hql)
            open('./sqoopjob/impala-bin/' + job['job_name'].lower() + '_imql.sh', 'w').write(kerboros + execjob  + imql)
            hcf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()
                    )
                    )
            imcf.write(
            '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_imql.sh >>../logs/{job_name}_imql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(
                job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        elif job['table_type'].lower()=='fact_iud'and job['partition']=='0':
            # 处理增量事实表,有增删改查的事实表
            hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                hive_db=job['hive_db'], hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-drop-import-delims \\
                    --hive-table {hive_db}.{hive_table} \\
                    --delete-target-dir \\
                    -m 2;\n'''.format(
                                            job_name=job['job_name'].lower(),
                                            sqoopmeta=sqoopmeta,
                                            jdbc=jdbc,
                                            hive_db=job['hive_db'],
                                            source_table=job['source_table'].upper(),
                                            split_by=job['split_by'].upper(),
                                            hive_table=job['hive_table'].upper(),
                                        ).replace('    ','')
            sjf.write(hivetruncate+createjob)
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ',
                                                                                                             '')
            open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
        elif job['table_type'].lower()=='fact_i'and job['partition']=='0':
            # 处理在线事实表,只有写入事务的事实表
            hivetruncate = '''hive -e"use {hive_db};truncate table {hive_table}"\n'''.format(
                hive_db=job['hive_db'], hive_table=job['hive_table'])
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                                    -- import --connect {jdbc} \\
                                    --table {source_table} \\
                                    --incremental append \\
                                    --split-by {split_by} \\
                                    --hive-import \\
                                    --hive-drop-import-delims \\
                                    --hive-table {hive_db}.{hive_table} \\
                                    --check-column  {check_column} \\
                                    --last-value '2011-01-01 11:00:00' \\
                                    -m 2;\n'''.format(
                                                                job_name=job['job_name'].lower(),
                                                                sqoopmeta=sqoopmeta,
                                                                jdbc=jdbc,
                                                                hive_db=job['hive_db'],
                                                                source_table=job['source_table'].upper(),
                                                                split_by=job['split_by'].upper(),
                                                                hive_table=job['hive_table'].upper(),
                                                                check_column=job['datatime_cloumn'].upper()
                                                            ).replace('    ', '')
            sjf.write(hivetruncate+createjob)
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ', '')
            open('./sqoopjob/job-bin/' + job['job_name'].lower() + '.sh', 'w').write(kerboros + execjob)
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))


        elif job['partition']=='1' and job['table_type'] in ['fact_i','fact_iud']:
            #处理带有where条件查询数据
            shell_cmd='''if [ $# -gt 1 ]; then
                        yesdate=$1
                        today=$2
                        var_len1=`echo ${yesdate} |wc -L`
                        var_len2=`echo ${today} |wc -L`
                        if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                            echo 'vars is wrong'
                            echo 'var input like:2017-01-01 2017-01-21'
                            exit
                        fi
                    else
                        yesdate=`date -d "today -1 day " +%Y-%m-%d`
                        today=`date -d today +%Y-%m-%d`
                    fi

                echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
            createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                    alter table {hive_table} add partition(dt='$yesdate') "
                    sqoop  import --connect {jdbc} \\
                    --table {source_table} \\
                    --where "{where} >= date'$yesdate' and {where}<date'$today' " \\
                    --split-by {split_by} \\
                    --hive-import \\
                    --hive-partition-key dt  \\
                    --hive-partition-value  $yesdate \\
                    --hive-drop-import-delims \\
                    --hive-table {hive_db}.{hive_table} \\
                    --delete-target-dir \\
                    -m 2;\n'''.format(
                                            job_name=job['job_name'].lower(),
                                            hive_db=job['hive_db'],
                                            sqoopmeta=sqoopmeta,
                                            jdbc=jdbc,
                                            source_table=job['source_table'].upper(),
                                            where=job['datatime_cloumn'],
                                            split_by=job['split_by'].upper(),
                                            hive_table=job['hive_table'].upper(),
                                        ).replace('    ','')
            #scmd.write(createjob)
            open('./sqoopjob/partition-bin/'+job['job_name'].lower()+'.sh', 'w').write(shell_cmd+createjob)
            open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
            crontabf.write(
                '''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
        elif job['partition'] == '1' and job['table_type'] in ['dim']:
            # 处理带有where条件查询数据
            shell_cmd = '''if [ $# -gt 1 ]; then
                    yesdate=$1
                    today=$2
                    var_len1=`echo ${yesdate} |wc -L`
                    var_len2=`echo ${today} |wc -L`
                    if [[ ${var_len1} != 10 || ${var_len2} != 10 ]];then
                        echo 'vars is wrong'
                        echo 'var input like:2017-01-01 2017-01-21'
                        exit
                    fi
                else
                    yesdate=`date -d "today -1 day " +%Y-%m-%d`
                    today=`date -d today +%Y-%m-%d`
                fi

            echo "data:${yesdate}  ${today}"\n'''.replace('    ',' ')
            createjob = '''hive -e"use {hive_db};alter table {hive_table} drop if  exists  partition (dt='$yesdate');
                       alter table {hive_table} add partition(dt='$yesdate') "
                       sqoop  import --connect {jdbc} \\
                       --table {source_table} \\
                       --split-by {split_by} \\
                       --hive-import \\
                       --hive-partition-key dt  \\
                       --hive-partition-value  $yesdate \\
                       --hive-drop-import-delims \\
                       --hive-table {hive_db}.{hive_table} \\
                       --delete-target-dir \\
                       -m 2;\n'''.format(
                                                job_name=job['job_name'].lower(),
                                                hive_db=job['hive_db'],
                                                sqoopmeta=sqoopmeta,
                                                jdbc=jdbc,
                                                source_table=job['source_table'].upper(),
                                                where=job['datatime_cloumn'],
                                                split_by=job['split_by'].upper(),
                                                hive_table=job['hive_table'].upper(),
                                                ).replace('    ', '')
            #scmd.write(createjob)
            open('./sqoopjob/partition-bin/' + job['job_name'].lower() + '.sh', 'w').write(shell_cmd+createjob)
            open('./sqoopjob/exec_run.sh', 'a').write(shell_cmd+createjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}.sh >>../logs/{job_name}.out 2>&1\n'''.format(
                    job_name=job['job_name'].lower()))
sjf.close()
hcf.close()
imcf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__=='__main__':

#生成json文件
json_make(input_file='./tablelist')
#生成sqoop脚本
create_job(tablelist='tablelist.json')




import json,os
def json_make(input_file='./HBase_tablelist'):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hbase-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hbase-create-table \\
                    --hbase-table {hive_table} \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    --hbase-row-key {row_key} \\
                    --column-family cf \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper(),
                    row_key=job['row_key']
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            createtable=''' CREATE EXTERNAL TABLE {hive_table}({columns})

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:LAST_ANALYZED,cf:SAMPLE_SIZE,cf:CHARACTER_SET_NAME")
TBLPROPERTIES("hbase.table.name" = "{hive_table}", "hbase.mapred.output.outputtable" = "{hive_table}");
'''.format(hive_table=job['hive_table'],

       columns=job['colums'].split(',').join())
            #print(sjf)
            sjf.write(createjob)

            df.write(deledrop)
            open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        else :pass

sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__=='__main__':

#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')


import json,os
def json_make(input_file='./hbase_tablelist'):

#input_file ='./sqoopjob/tablelist'
lines = open(input_file, "r",encoding="utf_8_sig").readlines()
[lines.remove(i) for i in lines if i in ['', '\n']]
lines = [line.strip() for line in lines]

# 获取键值
keys = lines[0].split('\t')
line_num = 1
total_lines = len(lines)
parsed_datas = []
while line_num < total_lines:
        values = lines[line_num].split("\t")
        parsed_datas.append(dict(zip(keys, values)))
        line_num = line_num + 1
json_str = json.dumps(parsed_datas, ensure_ascii=False, indent=4)
output_file = input_file+'.json'

# write to the file
f = open(output_file, "w", encoding="utf-8")
f.write(json_str)
f.close()
print('json格式转换结束!详见%s'%(output_file))

def create_job(tablelist):

if os.path.exists('sqoopjob'):
    os.system('rm -fr sqoopjob/*')
    os.system('mkdir  sqoopjob/hbase-bin')
else:
    os.mkdir('sqoopjob')
sqoopmeta='jdbc:hsqldb:hsql://localhost:16000/sqoop'
jdbc='jdbc:oracle:thin:@10.90.87.35:11521:bdccdb2 --username sqoopuser --password Bigdata_2016'
sjf=open('sqoopjob/createjob.sh', "w")
crontabf=open('./sqoopjob/crontab.cron', 'w')
df=open('./sqoopjob/deletejob.sh', 'w')
createtablef=open('./sqoopjob/createtable.sh', 'w')
kerboros='kinit -kt /keytab/sqoopdba.keytab sqoopdba \n'

with open(tablelist, 'r') as load_f:
    load_dict = json.load(load_f)
    for job in load_dict:
        if job['table_type'].lower()=='dim' and job['partition']=='0':
            #处理档案表
            createjob = '''sqoop  job --create {job_name} --meta-connect {sqoopmeta} \\
                    -- import --connect {jdbc} \\
                    --table {source_table} \\
                    --incremental append \\
                    --split-by {split_by} \\
                    --hbase-create-table \\
                    --hbase-table {hive_table} \\
                    --check-column  {check_column} \\
                    --last-value '2011-01-01 11:00:00' \\
                    --hbase-row-key {row_key} \\
                    --column-family cf \\
                    -m 2;\n'''.format(
                    job_name=job['job_name'].lower(),
                    sqoopmeta=sqoopmeta,
                    jdbc=jdbc,
                    source_table=job['source_table'].upper(),
                    split_by=job['split_by'].upper(),
                    hive_table=job['hive_table'].upper(),
                    check_column=job['datatime_cloumn'].upper(),
                    row_key=job['row_key']
                    ).replace('    ','')

            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')

            deledrop = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --delete {job_name};\n'''.format(job_name=job['job_name'].lower() ).replace('    ','')

            createtable='''hive -e "use{hive_db};CREATE EXTERNAL TABLE {hive_table}_external(key string,{columns_hive})
                        STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
                        WITH SERDEPROPERTIES ('hbase.columns.mapping' = '{columns_hbase}')
                        TBLPROPERTIES('hbase.table.name' = '{hive_table}',
                        'hbase.mapred.output.outputtable' = '{hive_table}')";\n '''.format(
                                    hive_table=job['hive_table'],
                                    hive_db=job['hive_db'],
                                    columns_hive=' string,'.join(job['columns'].split(','))+' string',
                                    columns_hbase=':key,cf:'+',cf:'.join(job['columns'].split(','))
                                    ).replace('    ','')
            sjf.write(createjob)
            createtablef.write(createtable)
            df.write(deledrop)
            open('./sqoopjob/hbase-bin/' + job['job_name'].lower() + '.sh', 'w').write(execjob)
            crontabf.write('''30 02 * * 6 cd /root/hive_import/bin&& ./{job_name}_hql.sh >>../logs/{job_name}_hql.out 2>&1\n'''.format(
                job_name=job['job_name'].lower()))
            execjob = '''sqoop  job --meta-connect jdbc:hsqldb:hsql://localhost:16000/sqoop \
                    --exec {job_name};\n'''.format(job_name=job['job_name'].lower()).replace('    ','')
            open('./sqoopjob/exec_run.sh', 'a').write(execjob)
        else :pass

sjf.close()
df.close()
#cf.close()
print('脚本生成结束,详见./sqoopjob/*')

if __name__=='__main__':

#生成json文件
json_make(input_file='./hbase_tablelist')
#生成sqoop脚本
create_job(tablelist='./hbase_tablelist.json')

--结束END--

本文标题: sqoop脚本批量生成

本文链接: https://www.lsjlt.com/news/192261.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • sqoop脚本批量生成
    通过all_tab_columnss字典表生成hive的建表语句 create or replace view create_sql as--通过all_tab_columnss字典表生成hive的建表语句select own...
    99+
    2023-01-31
    批量 脚本 sqoop
  • 怎么批量生成DDL脚本
    这篇文章主要讲解了“怎么批量生成DDL脚本”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么批量生成DDL脚本”吧! 获取用户...
    99+
    2022-10-18
  • 存储过程批量生成awr脚本
    DECLARE        l_snap_start       NUMBER := 40078;     l_s...
    99+
    2022-10-18
  • 如何自动生成批量执行SQL脚本的批处理
    这篇文章主要介绍如何自动生成批量执行SQL脚本的批处理,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!场景: DBA那边给我导出了所有的存储、函数等等对象的创建脚本,有上千个文件. 现在需要将这些对象创建脚本导入到另外...
    99+
    2023-06-08
  • Shell脚本实现批量生成nagios配置文件
    如果管理的站点和服务器较多的情况下,每次修改配置文件都相当痛苦。因而想到了用shell脚本来批量生成配置文件和配置数据。下面这个脚本是为了批量生成nagios监控配置文件的一个shell脚本程序。其原理是事...
    99+
    2022-06-04
    配置文件 批量 脚本
  • Shell脚本怎么实现批量生成nagios配置文件
    这篇文章主要讲解了“Shell脚本怎么实现批量生成nagios配置文件”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Shell脚本怎么实现批量生成nagios配置文件”吧!如果管理的站点和服...
    99+
    2023-06-09
  • 生成大量测试数据脚本
    生成大量数据来测试: create table t1 (id int not null primary key auto_increment,age int,name varchar(20),cr...
    99+
    2022-10-18
  • 批量管理python脚本
    新出炉的脚本, 有错的地方还望指出,谢谢。 #!/usr/bin/env python # -*- coding: utf-8 -*- # #  Syscloud Operation platform.py #...
    99+
    2023-01-31
    批量 脚本 python
  • python脚本生成html
    #-*- coding: utf-8 -*- from pyh import * CONST_LIST = [ ['1','AAA','100','...
    99+
    2023-01-31
    脚本 python html
  • 有哪些BAT批处理一键生成APK包脚本
    这篇文章主要介绍“有哪些BAT批处理一键生成APK包脚本”,在日常操作中,相信很多人在有哪些BAT批处理一键生成APK包脚本问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”有哪些BAT批处理一键生成APK包脚本...
    99+
    2023-06-08
  • python小脚本——批量将PDF文件转换成图片
    语言:python 3 用法:选择PDF文件所在的目录,点击 确定 后,自动将该目录下的所有PDF转换成单个图片,图片名称为:   pdf文件名.page_序号.jpg 如运行中报错,需要自行根据报错内容按照缺失的库 例如: #安装库pip...
    99+
    2023-09-10
    python
  • wpsvba如何批量生成word
    这篇文章主要介绍“wpsvba如何批量生成word”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“wpsvba如何批量生成word”文章能帮助大家解决问题。wpsvba批量生成word的方法:1、将以...
    99+
    2023-07-04
  • python3 拼接并批量生成sql
    #coding=utf-8 from openpyxl import load_workbook #读取excel的数据 def read_excel(): #打开一个workbook wb = load_workbook...
    99+
    2023-01-31
    批量 sql
  • oracle awr各类生成脚本
    某个实例 ...
    99+
    2022-10-18
  • shell如何批量curl接口脚本
    这篇文章主要介绍了shell如何批量curl接口脚本,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。最近,刚接一需求,如下: DBA会将一些服务规则的数据导出,然后一条条手动去...
    99+
    2023-06-09
  • Linux怎么批量执行redis脚本
    要批量执行redis脚本,你可以使用redis-cli工具和Shell脚本来实现。 以下是一个示例的Shell脚本,用于批量执行re...
    99+
    2023-10-27
    Linux redis
  • python 批量压缩图片的脚本
    目录简介需要 Needs用法 Usage代码实现效果另外一种图片压缩实现方式简介 用Python批量压缩图片,把文件夹或图片直接拖入即可 需要 Needs Python 3 Pillow (用pip install ...
    99+
    2022-06-02
    python 图片压缩 python 批量压缩
  • Shell脚本批量清除Nginx缓存
    前言*随着整个互联网的发展,产生了无数大大小小的网站,随之而来用户对网站UI和速度体验也在日益加强,对企业或者个人来说,赢得用户体验也就意味着赢得先机。 那今天我们在这里针对网站速度这方面来一起交流,提高网...
    99+
    2022-06-04
    缓存 批量 脚本
  • 【工具】批量删除binlog 的脚本
      MySQL DBA 偶尔会遇到因为空间不足,在不能删除data之前,可能先想到的是通过删除binlog 暂时解决空间问题。周末本人也遇到这样的情况,因为不在电脑旁边,找领导出马并且使用手机把命...
    99+
    2022-10-18
  • MySQL8批量修改字符集脚本
    目录1. 批量修改库字符集2. 批量修改表字符集3. 批量修改列字符集从低版本迁移到MySQL 8后,可能由于字符集问题出现 Illegal mix of collations (u...
    99+
    2023-03-24
    MySQL8批量修改字符集 MySQL8 修改字符集
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作