主要错误:
SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]
kafka连接配置如下:
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "awnx1-cdata-pnode06:9092,awnx1-cdata-pnode07:9092,awnx1-cdata-pnode08:9092") .option("kafka.security.protocol", "SASL_PLAINTEXT") .option("kafka.sasl.mechanism", "GSSAPI") .option("kafka.sasl.kerberos.service.name", "kafka") .option("subscribe", topic) .load()
jaas.conf内容如下:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./kafka-dev.keytab" principal="dev@kunlun.prod" useTicketCache=false refreshKrb5Config=true debug=true; };
spark提交脚本如下:
spark-submit --class kl.stream.StreamWrite2Hudi \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-memory 1g \ --conf yarn.nodemanager.resource.memory-mb=512m \ --num-executors 1 \ --executor-cores 1 \ --conf spark.yarn.access.hadoopFileSystems=hdfs://klbigdata:8020 \ --conf spark.yarn.maxAppAttempts=1 \ --keytab /root/KLSparkHudiDemo/dev.keytab \ --principal dev@kunlun.prod \ --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \ --files "/root/KLSparkHudiDemo/core-site.xml,/root/KLSparkHudiDemo/hdfs-site.xml,/root/KLSparkHudiDemo/jaas.conf,/root/KLSparkHudiDemo/kafka-dev.keytab" \ --jars "/root/KLSparkHudiDemo/lib/hudi-spark-bundle_2.11-0.9.0.jar,/root/KLSparkHudiDemo/lib/original-hudi-spark-bundle_2.11-0.9.0.jar,/root/KLSparkHudiDemo/lib/avro-1.8.2.jar,/root/KLSparkHudiDemo/lib/spark-sql-kafka-0-10_2.11-2.4.7.jar,/root/KLSparkHudiDemo/lib/kafk a-clients-2.0.0.3.1.5.0-152.jar,/root/KLSparkHudiDemo/lib/kafka_2.11-2.0.0.3.1.5.0-152.jar,/root/KLSparkHudiDemo/lib/zookeeper-3.4.6.3.1.5.0-152.jar" \ /root/KLSparkHudiDemo/KLSparkHudiDemo-1.0.jar test1 hdfs://klbigdata:8020/tmp/hudi_test_tb4
详细报错如下:
11065 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [awnx1-cdata-pnode06:9092, awnx1-cdata-pnode07:9092, awnx1-cdata-pnode08:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-44649c46-f023-4ef0-8c1d-b30521911d96--1773278251-driver-0 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 1 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = kafka sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer Debug is truestoreKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is ./kafka-dev.keytab refreshKrb5Config is true principal is dev@kunlun.prod tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Refreshing Kerberos configuration
principal is dev@kunlun.prod
Will use keytab
Commit Succeeded
11149 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
11154 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=dev@kunlun.prod]: TGT refresh thread started.
11158 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=dev@kunlun.prod]: TGT valid starting at: Tue Mar 01 10:09:34 CST 2022
11158 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=dev@kunlun.prod]: TGT expires: Wed Mar 02 10:09:34 CST 2022
11158 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=dev@kunlun.prod]: TGT refresh sleeping until: Wed Mar 02 05:24:38 CST 2022
11240 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.0.3.1.5.0-152
11240 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a10a6e16779f1930
11448 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] ERROR org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=spark-kafka-source-44649c46-f023-4ef0-8c1d-b30521911d96--1773278251-driver-0] Connection to node -2 failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
11451 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] WARN org.apache.spark.sql.kafka010.KafkaOffsetReader - Error in attempt 1 getting Kafka offsets:
org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$11.apply(KafkaOffsetReader.scala:217)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$11.apply(KafkaOffsetReader.scala:215)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:358)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:357)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:357)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:356)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:215)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:215)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:325)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:214)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:207)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:202)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:202)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:83)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:83)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:87)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:349)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:772)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
... 65 more
Caused by: KrbException: Server not found in Kerberos database (7) - LOOKING_UP_SERVER
at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73)
at sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:225)
at sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:236)
at sun.security.krb5.internal.CredentialsUtil.serviceCredsSingle(CredentialsUtil.java:400)
at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:287)
at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:263)
at sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:118)
at sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:490)
at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:695)
... 68 more
Caused by: KrbException: Identifier doesn't match expected value (906)
at sun.security.krb5.internal.KDCRep.init(KDCRep.java:140)
at sun.security.krb5.internal.TGSRep.init(TGSRep.java:65)
at sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:60)
at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55)
... 76 more
11634 [Driver] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Writing atomically to hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/metadata using temp file hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/.metadata.93ccd1f5-9f4a-4d03-bb08-319ecaa88aad.tmp
11651 [Driver] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Renamed temp file hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/.metadata.93ccd1f5-9f4a-4d03-bb08-319ecaa88aad.tmp to hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/metadata
11663 [Driver] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Starting writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]. Use hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700 to store the query checkpoint.
11665 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Using MicroBatchReader [KafkaV2[Subscribe[test1]]] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@4b9b6784]
11667 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Starting new streaming query.
11667 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Stream started from {}
11673 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [awnx1-cdata-pnode06:9092, awnx1-cdata-pnode07:9092, awnx1-cdata-pnode08:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = spark-kafka-source-dfd6e090-e495-4c80-9896-66a944dd07d2--1122458013-driver-0
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SASL_PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
解决:将kafka.bootstrap.servers参数中的kafka主机地址由host别名更换为主机名即可解决,因为我们kafka server的pricipal是kafka/ip-10-129-24-122.cn-northwest-1.compute.internal@kunlun.prod这种格式的,可见主机地址用的是“ip-10-129-24-122.cn-northwest-1.compute.internal”,也就是主机名,而不是host别名“awnx1-cdata-pnode06”,如果kafka连接参数使用的是host别名,那么将匹配不到真实的kafka server的pricipal