当kafka开启Kerberos认证后,如何使用Flink生产或消费数据呢?其实就是在生产消费者的代码中加入jaas.conf、keytab这些认证有关的配置,下面我们直接看代码:
版本信息:
flink1.9.0
kafka0.10.0
这里提示一下,如果版本依赖的不一致会报错,一定要对应版本:
java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
1.其实连接Kerberos集群很简单,需要下面三个文件:
1).KerberosServer的配置文件krb5.conf,让程序知道我应该哪个kdc去登录认证;
[libdefaults] udp_preference_limit = 1 renew_lifetime = 3650d forwardable = true default_realm = CHINAUNICOM ticket_lifetime = 3650d dns_lookup_realm = false dns_lookup_kdc = false default_ccache_name = /tmp/krb5cc_%{uid} #default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5 #default_tkt_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5[domain_realm] .CHINAUNICOM = CHINAUNICOM[logging] default = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log kdc = FILE:/var/log/krb5kdc.log[realms] CHINAUNICOM = { admin_server = master98.hadoop.ljs kdc = master98.hadoop.ljs }
2).认证肯定需要指定认证方式这里需要一个jaas.conf文件,一般集群的conf目录下都有;
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="D:\\kafkaSSL\\kafka.service.keytab" storeKey=true useTicketCache=false principal="kafka/salver32.hadoop.unicom@CHINAUNICOM" serviceName=kafka;};
3).就是用户的登录认证票据和认证文件,票据和keytab文件这里就不在贴了;
2.为防止你依赖报错,这里贴下pom.xml依赖,可能有些冗余,自己删除即可:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> <scope>compile</scope></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-fs</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version></dependency><dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>${httpclient.version}</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.9.0</version> <scope>compile</scope></dependency>
4.Flink接收socket端消息,发送到kafka:
5.Flink将socket接收的数据发送Kafka,代码实例:
package com.hadoop.ljs.flink.streaming;
import com.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-29 09:31 * @version: v1.0 * @description: com.hadoop.ljs.flink.streaming */public class FlinkKafkaKerberosProducer { public static final String topic="topic1"; public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf"; public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf"; public static final String bootstrapServers="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667"; public static final String hostname="localhost"; public static final int port=9000; public static void main(String[] args) throws Exception { //在windows中设置JAAS,也可以通过-D方式传入 System.setProperty("java.security.krb5.conf", krb5Conf); System.setProperty("java.security.auth.login.config", kafkaJaasConf); /*获取flink流式计算执行环境*/ final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*从Socket端接收数据*/ DataStream<String> dataSource = senv.socketTextStream(hostname, port, "\n"); /*下面可以根据自己的需求进行自动的转换*/ /*接收的数据,中间可经过复杂的处理,最后发送到kafka端*/ dataSource.addSink(new FlinkKafkaProducer010<String>(topic, new CustomKeyedSerializationSchema(), getProducerProperties())); /*启动*/ senv.execute("FlinkKafkaProducer"); } public
static Properties getProducerProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("acks", "1"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.kerberos.service.name", "kafka"); props.put("sasl.mechanism", "GSSAPI"); return props; }}
6.Flink连接kafka消费消息,代码实例:
package com.hadoop.ljs.flink.streaming;import com.hadoop.ljs.flink.utils.KafkaCommonRecordSchema;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;import java.util.Map;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-29 09:31 * @version: v1.0 * @description: com.hadoop.ljs.flink.streaming */public class FlinkKafkaKerberosConsumer { public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf"; public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf"; public static final String topic="topic1"; public static final String consumerGroup="test_topic1"; public static final String bootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667"; public static void main(String[] args) throws Exception { //在windows中设置JAAS,也可以通过-D方式传入 System.setProperty("java.security.krb5.conf", krb5Conf); System.setProperty("java.security.auth.login.config", kafkaJaasConf);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); FlinkKafkaConsumer010<String> consumer010 = new FlinkKafkaConsumer010<String>(topic,new SimpleStringSchema(), getComsumerProperties()); consumer010.setStartFromEarliest(); //source从kafka DataStream<String> dataStream = env.addSource(consumer010); dataStream.print(); try { env.execute(); } catch (Exception ex) { ex.printStackTrace(); } } private static Properties getComsumerProperties() { Properties props = new Properties(); props.put("bootstrap.servers",bootstrapServer); props.put("group.id",consumerGroup); props.put("auto.offset.reset", "earliest"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.kerberos.service.name", "kafka"); props.put("sasl.mechanism", "GSSAPI"); return props; }}
废话不多说,直接上1、定义一个Person类:public class Person {
String name;
int age;
public Person(String name, int age) {
super();
this.name = name;
this.age = age;
@Override
public String toString() {