flink和kafka的连接是十分友好的,毕竟是做流式处理的吧。
首先依赖
org.apache.flink flink-scala_2.121.10.1 org.apache.flink flink-streaming-scala_2.121.10.1 org.apache.flink flink-connector-kafka-0.11_2.121.10.1
接着是代码
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.api.scala._ object KafkaSource { def main(args: Array[String]): Unit = { //环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //并行度 env.setParallelism(4) env.disableOperatorChaining() //kafka配置 集群以逗号隔开,如172.0.0.101:1111,172.0.0.102:1111 val pro: Properties = new Properties() pro.setProperty("bootstrap.servers", "*******"); pro.setProperty("group.id", "topic"); pro.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") pro.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") pro.setProperty("auto.offset.reset","latest") //接收kafka数据 env.addSource(new FlinkKafkaConsumer011[String]("topic",new SimpleStringSchema(),pro)) .print() //执行 env.execute() } }二、MySQL source
MySQL采用自定义数据源的方式
依赖
mysql mysql-connector-java8.0.25
代码
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ object MysqlSource { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //接收MySQL数据 val inputData: DataStream[CNC_AlarmAnalysresult] = env.addSource(new MySQLSource).setParallelism(1) inputData.print() env.execute("mysql source") } //根据表 创建样例类 case class class_name(id: Int, cid: Int) class MySQLSource extends RichParallelSourceFunction[class_name] { var flag = true var conn: Connection = _ var stat: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://172.8.10.188:3306/1001_161?characterEncoding=utf-8&serverTimezone=UTC", "siger", "Siger_123") val sql = "select id,cid from class_name" stat = conn.prepareStatement(sql) } override def run(sourceContext: SourceFunction.SourceContext[CNC_AlarmAnalysresult]): Unit = { while (flag) { val resultSet: ResultSet = stat.executeQuery() while (resultSet.next()) { val id = resultSet.getInt("id") val cid = resultSet.getInt("cid") sourceContext.collect(class_name(id, cid)) Thread.sleep(100) } } } override def cancel(): Unit = { flag = false } override def close(): Unit = { if (stat != null) stat.close() if (conn != null) conn.close() } } }
几个月没写博客了,以后还是要坚持写才好。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)