网站建设知识
HIVE表同步至MySQL表
2025-07-22 10:02  点击:0

HIVE表同步至MySQL表问题描述:
现在公司需要将集群中一台机器上的HIVE表数据同步至另一个集群一台机器的MySQL表中,且这两个机器的网段是不连通的。如何实现?

问题分析:
由于两台机器不连通,所以无法通过HIVE表所在集群中的Sqoop直接将数据同步至MySQL表。但是HIVE表所在集群有ftp服务,且MySQL表所在集群有一台机器可以访问外网。

解决思路:

Step one:将HIVE表数据导出到HIVE表机器本地 Step two:将HIVE表所在机器本地导出文件压缩打包,通过ftp存入ftp站点 Step three:在MySQL表所在集群的可以访问外网的机器从ftp站点下载压缩文件,并解压到本地 Step four:通过JAVA jdbc将本地文件中的数据存入MySQL库

具体涉及到的程序:

Step one:导出HIVE表数据到本地,并压缩,HiveToMySQL.sh
#!/bin/bashsource ~/.bash_profiledirectory='/data/X. T. Xiao/HiveToMySQL/'if [ -d $directory ]then  echo "目录存在!"else   echo “目录不存在!”   mkdir $directoryficd $directoryrm -rf .    public static void insertIntoJHTJTable(String fileName) throws SQLException {        Connection conn = null;        String sql;        try {            Class.forName("com.mysql.jdbc.Driver").newInstance();// 动态加载mysql驱动            System.out.println("MySQL driver is prepared OK !");            // 一个Connection代表一个数据库连接            conn = DriverManager.getConnection(                    "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",                    "rcmnd20151027"); // 链接本地MYSQL            // Statement里面带有很多方法,比如executeUpdate可以实现插入,更新和删除等            Statement stmt = conn.createStatement();            sql = "delete from t_live_on_demand";            stmt.executeUpdate(sql);            File file = new File(fileName);            BufferedReader reader = null;            String[] arrs = null;            try {                reader = new BufferedReader(new FileReader(file));                String tempString = null;                // 一次读入一行,直到文件结束                while ((tempString = reader.readLine()) != null) {                    if (tempString.indexOf("[GC") != -1)                        continue;                    arrs = tempString.split("\t");                    sql = "insert into t_live_on_demand"                            + "(cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type) "                            + "values " + "(" + arrs[0].replace("-", "") + ","                            + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","                            + arrs[4] + "," + arrs[5] + "," + arrs[6] + ","                            + arrs[7] + "," + arrs[8] + ")";                    // System.out.println(sql);                    stmt.executeUpdate(sql);                }                reader.close();            } catch (IOException e) {                e.printStackTrace();            } finally {                if (reader != null) {                    try {                        reader.close();                    } catch (IOException e1) {                    }                }            }        } catch (SQLException e) {            System.out.println("MySQL操作错误");            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        } finally {            conn.close();        }    }        public static void insertIntoVIPTSTable(String fileName)            throws SQLException {        Connection conn = null;        String sql;        try {            Class.forName("com.mysql.jdbc.Driver").newInstance();// 动态加载mysql驱动            System.out.println("MySQL driver is prepared OK !");            // 一个Connection代表一个数据库连接            conn = DriverManager.getConnection(                    "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",                    "rcmnd20151027"); // 链接本地MYSQL            // Statement里面带有很多方法,比如executeUpdate可以实现插入,更新和删除等            Statement stmt = conn.createStatement();            sql = "delete from t_vip_film";            stmt.executeUpdate(sql);            File file = new File(fileName);            BufferedReader reader = null;            String[] arrs = null;            try {                reader = new BufferedReader(new FileReader(file));                String tempString = null;                // 一次读入一行,直到文件结束                while ((tempString = reader.readLine()) != null) {                    if (tempString.indexOf("[GC") != -1)                        continue;                    arrs = tempString.split("\t");                    sql = "insert into t_vip_film"                            + "(cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type) "                            + "values " + "(" + arrs[0].replace("-", "") + ","                            + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","                            + arrs[4] + "," + arrs[5] +"," + arrs[6] + ")";                    // System.out.println(sql);                    stmt.executeUpdate(sql);                }                reader.close();            } catch (IOException e) {                e.printStackTrace();            } finally {                if (reader != null) {                    try {                        reader.close();                    } catch (IOException e1) {                    }                }            }        } catch (SQLException e) {            System.out.println("MySQL操作错误");            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        } finally {            conn.close();        }    }        public static void insertIntoZJTSTable(String fileName) throws SQLException {        Connection conn = null;        String sql;        try {            Class.forName("com.mysql.jdbc.Driver").newInstance();// 动态加载mysql驱动            System.out.println("MySQL driver is prepared OK !");            // 一个Connection代表一个数据库连接            conn = DriverManager.getConnection(                    "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",                    "rcmnd20151027"); // 链接本地MYSQL            // Statement里面带有很多方法,比如executeUpdate可以实现插入,更新和删除等            Statement stmt = conn.createStatement();            sql = "delete from t_binge_watching";            stmt.executeUpdate(sql);            File file = new File(fileName);            BufferedReader reader = null;            String[] arrs = null;            try {                reader = new BufferedReader(new FileReader(file));                String tempString = null;                // 一次读入一行,直到文件结束                while ((tempString = reader.readLine()) != null) {                    if (tempString.indexOf("[GC") != -1)                        continue;                    arrs = tempString.split("\t");                    sql = "insert into t_binge_watching"                            + "(cnt_date, recommended_cnt, click_cnt, user_cnt, click_percent, per_capita_use, type) "                            + "values " + "(" + arrs[0].replace("-", "") + ","                            + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","                            + arrs[4] + "," + arrs[5] + "," + arrs[6] + ")";                    // System.out.println(sql);                    stmt.executeUpdate(sql);                }                reader.close();            } catch (IOException e) {                e.printStackTrace();            } finally {                if (reader != null) {                    try {                        reader.close();                    } catch (IOException e1) {                    }                }            }        } catch (SQLException e) {            System.out.println("MySQL操作错误");            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        } finally {            conn.close();        }    }        public static void insertIntoKJTJTable(String fileName) throws SQLException {        Connection conn = null;        String sql;        try {            Class.forName("com.mysql.jdbc.Driver").newInstance();// 动态加载mysql驱动            System.out.println("MySQL driver is prepared OK !");            // 一个Connection代表一个数据库连接            conn = DriverManager.getConnection(                    "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",                    "rcmnd20151027"); // 链接本地MYSQL            // Statement里面带有很多方法,比如executeUpdate可以实现插入,更新和删除等            Statement stmt = conn.createStatement();            sql = "delete from t_boot_live";            stmt.executeUpdate(sql);            File file = new File(fileName);            BufferedReader reader = null;            String[] arrs = null;            try {                reader = new BufferedReader(new FileReader(file));                String tempString = null;                // 一次读入一行,直到文件结束                while ((tempString = reader.readLine()) != null) {                    if (tempString.indexOf("[GC") != -1)                        continue;                    arrs = tempString.split("\t");                    sql = "insert into t_boot_live"                            + "(cnt_date, recommended_cnt, play_cnt, boot_cnt, play_percent, recommended_percent, type) "                            + "values " + "(" + arrs[0].replace("-", "") + ","                            + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","                            + arrs[4] + "," + arrs[5] + "," + arrs[6] + ")";                    // System.out.println(sql);                    stmt.executeUpdate(sql);                }                reader.close();            } catch (IOException e) {                e.printStackTrace();            } finally {                if (reader != null) {                    try {                        reader.close();                    } catch (IOException e1) {                    }                }            }        } catch (SQLException e) {            System.out.println("MySQL操作错误");            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        } finally {            conn.close();        }    }        public static void insertIntoQJTJTable(String fileName) throws SQLException {        Connection conn = null;        String sql;        try {            Class.forName("com.mysql.jdbc.Driver").newInstance();// 动态加载mysql驱动            System.out.println("MySQL driver is prepared OK !");            // 一个Connection代表一个数据库连接            conn = DriverManager.getConnection(                    "jdbc:mysql://17.7.113.133:8096/recommend", "recommend",                    "rcmnd20151027"); // 链接本地MYSQL            // Statement里面带有很多方法,比如executeUpdate可以实现插入,更新和删除等            Statement stmt = conn.createStatement();            sql = "delete from t_search_individuation";            stmt.executeUpdate(sql);            File file = new File(fileName);            BufferedReader reader = null;            String[] arrs = null;            try {                reader = new BufferedReader(new FileReader(file));                String tempString = null;                // 一次读入一行,直到文件结束                while ((tempString = reader.readLine()) != null) {                    if (tempString.indexOf("[GC") != -1)                        continue;                    arrs = tempString.split("\t");                    sql = "insert into t_search_individuation"                            + "(cnt_date, click_cnt, play_cnt, user_cnt, recommended_cnt, click_percent, play_percent, per_capita_use, type) "                            + "values " + "(" + arrs[0].replace("-", "") + ","                            + arrs[1] + "," + arrs[2] + "," + arrs[3] + ","                            + arrs[4] + "," + arrs[5] + "," + arrs[6] + ","                            + arrs[7] + "," + arrs[8] + ")";                    // System.out.println(sql);                    stmt.executeUpdate(sql);                }                reader.close();            } catch (IOException e) {                e.printStackTrace();            } finally {                if (reader != null) {                    try {                        reader.close();                    } catch (IOException e1) {                    }                }            }        } catch (SQLException e) {            System.out.println("MySQL操作错误");            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        } finally {            conn.close();        }    }       public static void main(String[] args) throws SQLException {        String fileName1 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/JHTJ_tongbu.txt";        TxtToMySql.insertIntoJHTJTable(fileName1);        System.out.println("JHTJ is OK!");        String fileName2 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/KJTJ_tongbu.txt";        TxtToMySql.insertIntoKJTJTable(fileName2);        System.out.println("KJTJ is OK!");          String fileName3 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/QJTJ_tongbu.txt";        TxtToMySql.insertIntoQJTJTable(fileName3);        System.out.println("QJTJ is OK!");              String fileName4 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/VIPTS_tongbu.txt";        TxtToMySql.insertIntoVIPTSTable(fileName4);        System.out.println("VIPTS is OK!");             String fileName5 = "/home/ftpdata/ftp_test/data/xinting.xiao/HiveToMySQL/ZJTS_tongbu.txt";        TxtToMySql.insertIntoZJTSTable(fileName5);        System.out.println("ZJTS is OK!");                  }}
* Step five:同步,部署定时任务

crontab -e
5 1 * * * /bin/bash ./HiveToMySQL.sh
5 3 * * * python ./ftp_tran.py

crontab -e
15 3 * * * /bin/bash ./tarToLocal.sh
35 3 * * * java -jar txtToMySQL.jar

注意问题:
hive -e 的txt文本结果中可能存在[GC….]类似字符,需要处理 各个路径需要仔细测试 定时任务要考虑任务的处理时延