资讯详情

Structured Streaming消费带kerberos认证的kafka问题解决

主要错误:

SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]

kafka连接配置如下:

val df = spark       .readStream       .format("kafka")       .option("kafka.bootstrap.servers", "awnx1-cdata-pnode06:9092,awnx1-cdata-pnode07:9092,awnx1-cdata-pnode08:9092")       .option("kafka.security.protocol", "SASL_PLAINTEXT")       .option("kafka.sasl.mechanism", "GSSAPI")       .option("kafka.sasl.kerberos.service.name", "kafka")       .option("subscribe", topic)       .load()

jaas.conf内容如下:

KafkaClient {         com.sun.security.auth.module.Krb5LoginModule required         useKeyTab=true         storeKey=true         keyTab="./kafka-dev.keytab"         principal="dev@kunlun.prod"         useTicketCache=false         refreshKrb5Config=true         debug=true; };

spark提交脚本如下:

spark-submit --class kl.stream.StreamWrite2Hudi \  --master yarn \  --deploy-mode cluster \  --driver-memory 1g \  --executor-memory 1g \  --conf yarn.nodemanager.resource.memory-mb=512m \  --num-executors 1 \  --executor-cores 1 \  --conf spark.yarn.access.hadoopFileSystems=hdfs://klbigdata:8020 \  --conf spark.yarn.maxAppAttempts=1 \  --keytab /root/KLSparkHudiDemo/dev.keytab \  --principal dev@kunlun.prod \  --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" \  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \  --files "/root/KLSparkHudiDemo/core-site.xml,/root/KLSparkHudiDemo/hdfs-site.xml,/root/KLSparkHudiDemo/jaas.conf,/root/KLSparkHudiDemo/kafka-dev.keytab" \  --jars "/root/KLSparkHudiDemo/lib/hudi-spark-bundle_2.11-0.9.0.jar,/root/KLSparkHudiDemo/lib/original-hudi-spark-bundle_2.11-0.9.0.jar,/root/KLSparkHudiDemo/lib/avro-1.8.2.jar,/root/KLSparkHudiDemo/lib/spark-sql-kafka-0-10_2.11-2.4.7.jar,/root/KLSparkHudiDemo/lib/kafk a-clients-2.0.0.3.1.5.0-152.jar,/root/KLSparkHudiDemo/lib/kafka_2.11-2.0.0.3.1.5.0-152.jar,/root/KLSparkHudiDemo/lib/zookeeper-3.4.6.3.1.5.0-152.jar" \  /root/KLSparkHudiDemo/KLSparkHudiDemo-1.0.jar test1 hdfs://klbigdata:8020/tmp/hudi_test_tb4 

详细报错如下:

11065 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:   auto.commit.interval.ms = 5000  auto.offset.reset = earliest  bootstrap.servers = [awnx1-cdata-pnode06:9092, awnx1-cdata-pnode07:9092, awnx1-cdata-pnode08:9092]  check.crcs = true  client.id =   connections.max.idle.ms = 540000  default.api.timeout.ms = 60000  enable.auto.commit = false  exclude.internal.topics = true  fetch.max.bytes = 52428800  fetch.max.wait.ms = 500  fetch.min.bytes = 1  group.id = spark-kafka-source-44649c46-f023-4ef0-8c1d-b30521911d96--1773278251-driver-0  heartbeat.interval.ms = 3000  interceptor.classes = []  internal.leave.group.on.close = true  isolation.level = read_uncommitted  key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer  max.partition.fetch.bytes = 1048576  max.poll.interval.ms = 300000  max.poll.records = 1  metadata.max.age.ms = 300000  metric.reporters = []  metrics.num.samples = 2  metrics.recording.level = INFO  metrics.sample.window.ms = 30000  partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]  receive.buffer.bytes = 65536  reconnect.backoff.max.ms = 1000  reconnect.backoff.ms = 50  request.timeout.ms = 30000  retry.backoff.ms = 100  sasl.client.callback.handler.class = null  sasl.jaas.config = null  sasl.kerberos.kinit.cmd = /usr/bin/kinit  sasl.kerberos.min.time.before.relogin = 60000  sasl.kerberos.service.name = kafka  sasl.kerberos.ticket.renew.jitter = 0.05  sasl.kerberos.ticket.renew.window.factor = 0.8  sasl.login.callback.handler.class = null  sasl.login.class = null  sasl.login.refresh.buffer.seconds = 300  sasl.login.refresh.min.period.seconds = 60  sasl.login.refresh.window.factor = 0.8  sasl.login.refresh.window.jitter = 0.05  sasl.mechanism = GSSAPI  security.protocol = SASL_PLAINTEXT  send.buffer.bytes = 131072  session.timeout.ms = 10000  ssl.cipher.suites = null  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]  ssl.endpoint.identification.algorithm = https  ssl.key.password = null  ssl.keymanager.algorithm = SunX509  ssl.keystore.location = null  ssl.keystore.password = null  ssl.keystore.type = JKS  ssl.protocol = TLS  ssl.provider = null  ssl.secure.random.implementation = null  ssl.trustmanager.algorithm = PKIX  ssl.truststore.location = null  ssl.truststore.password = null  ssl.truststore.type = JKS  value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer  Debug is  truestoreKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is ./kafka-dev.keytab refreshKrb5Config is true principal is dev@kunlun.prod tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Refreshing Kerberos configuration
principal is dev@kunlun.prod
Will use keytab
Commit Succeeded 

11149 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO  org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully logged in.
11154 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO  org.apache.kafka.common.security.kerberos.KerberosLogin  - [Principal=dev@kunlun.prod]: TGT refresh thread started.
11158 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO  org.apache.kafka.common.security.kerberos.KerberosLogin  - [Principal=dev@kunlun.prod]: TGT valid starting at: Tue Mar 01 10:09:34 CST 2022
11158 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO  org.apache.kafka.common.security.kerberos.KerberosLogin  - [Principal=dev@kunlun.prod]: TGT expires: Wed Mar 02 10:09:34 CST 2022
11158 [kafka-kerberos-refresh-thread-dev@kunlun.prod] INFO  org.apache.kafka.common.security.kerberos.KerberosLogin  - [Principal=dev@kunlun.prod]: TGT refresh sleeping until: Wed Mar 02 05:24:38 CST 2022
11240 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 2.0.0.3.1.5.0-152
11240 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId : a10a6e16779f1930
11448 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] ERROR org.apache.kafka.clients.NetworkClient  - [Consumer clientId=consumer-1, groupId=spark-kafka-source-44649c46-f023-4ef0-8c1d-b30521911d96--1773278251-driver-0] Connection to node -2 failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
11451 [stream execution thread for [id = 8b900cdf-91ae-4794-8141-b519234c4415, runId = 413a45ea-7cd6-4a25-a2cb-6b7f1363ea08]] WARN  org.apache.spark.sql.kafka010.KafkaOffsetReader  - Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]
	at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205)
	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$11.apply(KafkaOffsetReader.scala:217)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$11.apply(KafkaOffsetReader.scala:215)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:358)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:357)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:357)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:356)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:215)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:215)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:325)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:214)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:207)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:202)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:202)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets$lzycompute(KafkaMicroBatchReader.scala:83)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$initialPartitionOffsets(KafkaMicroBatchReader.scala:83)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.setOffsetRange(KafkaMicroBatchReader.scala:87)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply$mcV$sp(MicroBatchExecution.scala:353)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$2.apply(MicroBatchExecution.scala:353)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:349)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
	at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:772)
	at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
	at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
	at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
	... 65 more
Caused by: KrbException: Server not found in Kerberos database (7) - LOOKING_UP_SERVER
	at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73)
	at sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:225)
	at sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:236)
	at sun.security.krb5.internal.CredentialsUtil.serviceCredsSingle(CredentialsUtil.java:400)
	at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:287)
	at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:263)
	at sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:118)
	at sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:490)
	at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:695)
	... 68 more
Caused by: KrbException: Identifier doesn't match expected value (906)
	at sun.security.krb5.internal.KDCRep.init(KDCRep.java:140)
	at sun.security.krb5.internal.TGSRep.init(TGSRep.java:65)
	at sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:60)
	at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55)
	... 76 more
11634 [Driver] INFO  org.apache.spark.sql.execution.streaming.CheckpointFileManager  - Writing atomically to hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/metadata using temp file hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/.metadata.93ccd1f5-9f4a-4d03-bb08-319ecaa88aad.tmp
11651 [Driver] INFO  org.apache.spark.sql.execution.streaming.CheckpointFileManager  - Renamed temp file hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/.metadata.93ccd1f5-9f4a-4d03-bb08-319ecaa88aad.tmp to hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700/metadata
11663 [Driver] INFO  org.apache.spark.sql.execution.streaming.MicroBatchExecution  - Starting writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]. Use hdfs://klbigdata/data/hadoop/yarn/local/usercache/dev/appcache/application_1646032758327_0045/container_e42_1646032758327_0045_01_000001/tmp/temporary-dff48b8b-ca1f-46e8-bb90-61b44cfc8700 to store the query checkpoint.
11665 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO  org.apache.spark.sql.execution.streaming.MicroBatchExecution  - Using MicroBatchReader [KafkaV2[Subscribe[test1]]] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@4b9b6784]
11667 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO  org.apache.spark.sql.execution.streaming.MicroBatchExecution  - Starting new streaming query.
11667 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO  org.apache.spark.sql.execution.streaming.MicroBatchExecution  - Stream started from {}
11673 [stream execution thread for writeToHudi [id = 287529a2-e0b1-4f36-8711-a2f9a7475848, runId = 81501e42-3f28-4cbb-820d-2112bfb0a612]] INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [awnx1-cdata-pnode06:9092, awnx1-cdata-pnode07:9092, awnx1-cdata-pnode08:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = spark-kafka-source-dfd6e090-e495-4c80-9896-66a944dd07d2--1122458013-driver-0
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 1
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = kafka
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = SASL_PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

解决:将kafka.bootstrap.servers参数中的kafka主机地址由host别名更换为主机名即可解决,因为我们kafka server的pricipal是kafka/ip-10-129-24-122.cn-northwest-1.compute.internal@kunlun.prod这种格式的,可见主机地址用的是“ip-10-129-24-122.cn-northwest-1.compute.internal”,也就是主机名,而不是host别名“awnx1-cdata-pnode06”,如果kafka连接参数使用的是host别名,那么将匹配不到真实的kafka server的pricipal

标签: 820d型温度传感器温度探头

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台