本文共 1700 字,大约阅读时间需要 5 分钟。
//创建环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSettext = env.readTextFile("文件路径"); //读取文件,对文件中的单词进行计数 DataSet > counts = text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); *//使用flink进行存储mysql时,必须使用row的泛型进行存储,这里将处理的结果进行转化 DataSet map = counts.map(new MapFunction
, Row>() { @Override public Row map(Tuple2 t) throws Exception { Row row = new Row(2); row.setField(0, t.f0.getBytes("UTF-8")); row.setField(1, t.f1); return row; } });
//mysql连接 String driverClass = "com.mysql.cj.jdbc.Driver"; String dbUrl = "jdbc:mysql://localhost:3306/数据库?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true "; String userNmae = "用户名"; String passWord = "密码"; String sql = "insert into 表名 (name,age) values (?,?)"; //将数据写入mysql数据库map.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(driverClass) .setDBUrl(dbUrl) .setUsername(userNmae) .setPassword(passWord) .setQuery(sql) .finish()); //这个必须要执行,否则看不到结果 env.execute("insert into mysql");
1.将flink job部署,将jar包上传即可,可上传名字相同的jar,他们会有不同的id
2.查看jar的id 访问http://localhost:8081/jars 地址即可看到3.使用postman 进行测试entryClass 定位到要执行的job类,programArgs 请求的参数。 返回jobid说明任务执行成功,数据库可看到结果。请求地址使用的是id进行访问的,官网说明使用jar id。请求的参数在官网的api有详细说明。 欢迎评论交流!!!
转载地址:http://faxui.baihongyu.com/