Spark (Streaming) - Kafka on Kerberized HDP

The common issues we probably encounter when doing debugging. The environment is HDP 2.6.1.0 with FreeIPA kerberized, Spark 1.6.3 and Kafka 0.10.1
Refer to the test code from below link [–> added lines enabling it running in kerberized.]
JavaDirectKafkaWordCount

package spark.example;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.regex.Pattern;
import scala.Tuple2;
import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
public final class JavaDirectKafkaWordCount {
    private static final Pattern SPACE = Pattern.compile( " " );
    public static void main(String[] args) {
        // Create context with a 2 seconds batch interval
        SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, Durations.seconds( 200 ) );
        HashSet<String> topicsSet = new HashSet<String>( Arrays.asList( "kerb" ) );
        HashMap<String, String> kafkaParams = new HashMap<String,String>();
        kafkaParams.put("bootstrap.servers", "local0.field.hortonworks.com:6667");
        kafkaParams.put("group.id", "test_sparkstreaming_kafka");
        kafkaParams.put("auto.offset.reset", "largest");
        kafkaParams.put("security.protocol", "PLAINTEXTSASL");
        System.setProperty("java.security.auth.login.config","/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );
        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = messages.map( new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        } );
        JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String x) {
                return Lists.newArrayList( SPACE.split( x ) );
            }
        } );
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>( s, 1 );
                    }
                } ).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                } );
        wordCounts.print();
        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }
} 

The jaas configuration file wraps the principal and its keytab path.
Here we have directly used the kafka service account. Ideally you should build the user account for this.

[root@local0 ~]# cat /usr/hdp/current/kafka-broker/config/kafka_client_jaas.confKafkaClient
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
doNotPrompt=true
principal="kafka/local0.field.hortonworks.com@test.hortonworks.com"
keyTab="kafka.service.keytab"
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
doNotPrompt=true
principal="kafka/local0.field.hortonworks.com@test.hortonworks.com"
keyTab="kafka.service.keytab"
useTicketCache=true
renewTicket=true
serviceName="kafka";
}; 

Details on the submission command as below -

//spark job for kerberized hdp 2.5
spark-submit --master=yarn --deploy-mode=cluster \
--files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.yarn.historyServer.address=http://local0.field.hortonworks.com:18080"  \
--conf "spark.eventLog.dir=hdfs:///spark-history"  \
--conf "spark.eventLog.enabled=true" \
--jars /usr/hdp/current/spark-client/lib/spark-assembly-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar,/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.6.1.0-129.jar,/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar \
--class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar
There is no need such in HDP 2.6
//spark job for kerberized hdp 2.6
spark-submit --master=yarn --deploy-mode=cluster \
--files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--jars /usr/hdp/current/spark-client/lib/spark-assembly-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar,/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.6.1.0-129.jar,/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar \
--class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar 

Details on the above code and command, refer https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.3/bk_spark-guide/content/spark-streaming-kafka-kerb.html


  1. Zookeeper access right check

Because Kafka need to read topics and offset information from zookeeper znode, check if the acl satisfies

[root@local0 tmp]# cd /usr/hdp/current/zookeeper-client/bin
[root@local0 bin]# ./zkCli.sh -server local0:2181
[zk: local0:2181(CONNECTED) 0] getAcl /
'world,'anyone
: cdrwa
[zk: local0:2181(CONNECTED) 2] getAcl /consumers
'world,'anyone
: cdrwa
[zk: local0:2181(CONNECTED) 4] getAcl /brokers   
'world,'anyone
: cdrwa 

  1. HDFS access right check
[root@local0 ~]# hdfs dfs -ls /user
Found 10 items
drwxrwx--- - ambari-qa hdfs0 2017-09-17 13:34 /user/ambari-qa
drwxr-xr-x - hbase hdfs0 2017-08-20 00:08 /user/hbased
rwxr-xr-x - hcathdfs0 2017-08-06 00:55 /user/hcat
drwx------ - hdfshdfs0 2017-09-28 03:10 /user/hdfs
drwxr-xr-x - hivehdfs0 2017-08-06 00:55 /user/hive
drwxr-xr-x - kafka hdfs0 2017-09-28 12:23 /user/kafka
drwxrwxr-x - oozie hdfs0 2017-08-06 00:56 /user/oozie
drwxr-xr-x - hdfshdfs0 2017-08-13 11:47 /user/root
drwxrwxr-x - spark hdfs0 2017-09-08 16:54 /user/spark
drwxr-xr-x - yarnhdfs0 2017-09-16 22:17 /user/yarn 

Note by default there did not create path “/user/kafka”, so you need do it by hand and give right access

[root@local0 ~]# hdfs dfs -mkdir /user/kafka
[root@local0 ~]# hdfs dfs -chown kafka:hdfs /user/kafka 

  1. Jaas configuration file and Keytab file access right

Sparkstreaming job here is submitted in yarn cluster, which then plays as yarn user to read jaas and keytab files, should check the file acl. By default keytab file only its principal user to read, other users cannot access. You should grant other user to read those two files.

[root@local0 ~]# ls -l /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf
-rw-r--r-- 1 kafka hadoop 560 Sep 26 13:35 /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf
[root@local0 ~]# ls -l /etc/security/keytabs/kafka.service.keytab
-r--r--r-- 1 kafka hadoop 208 Aug6 01:53 /etc/security/keytabs/kafka.service.keytab 
  1. Specify path of Jaas and Keytab files

As emphasized in most Spark articles, Sparkstreaming driver wrap Jaas and Keytab files and sent to executor to parse. The very important step is specify the path on the driver machine. There is no need to hand-copy those files to all slave nodes.

--files /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf,/etc/security/keytabs/kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \ 

If there's some configurations missed as detailed above , you will likely come across error(s) saying :
org.apache.spark.SparkException: Couldn’t connect to leader for topic

or

javax.security.auth.login.LoginException:Unable to obtain password from user
or

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:Jaas configuration not found

[root@local0 tmp]# yarn application -list -appStates RUNNING
17/09/30 22:22:08 INFO client.RMProxy: Connecting to ResourceManager at local1.field.hortonworks.com/172.26.197.243:8050
17/09/30 22:22:09 INFO client.AHSProxy: Connecting to Application History server at local1.field.hortonworks.com/172.26.197.243:10200
Total number of applications (application-types: [] and states: [RUNNING]):3
                Application-Id    Application-Name    Application-Type      User     Queue             State       Final-State       Progress                       Tracking-URL
application_1505572583831_0056spark.example.JavaDirectKafkaWordCount               SPARK     kafka   default           RUNNING         UNDEFINED            10%        http://172.26.197.246:37394
  • Use Yarn resource manager and History server webpage to check log

Yarn resource manager helps you dive into containers’ logs since spark execute tasks distributed across contained executors. While History server enable you to check spark DAG and full stack printed on each executor, and the more is collecting metrics help to do problem analysis.

  • Maven: specify HDP repository and dependent jar for compiling purpose

Should align with HDP release package and have jar files searched from HDP official repository

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <id>hortonworks.extrepo</id>
        <name>Hortonworks HDP</name>
        <url>http://repo.hortonworks.com/content/repositories/releases</url>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
        </releases>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
        <id>hortonworks.other</id>
        <name>Hortonworks Other Dependencies</name>
        <url>http://repo.hortonworks.com/content/groups/public</url>
    </repository>
</repositories><br>

For this test code, we have following dependent package

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3.2.6.1.0-129</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.3.2.6.1.0-129</version>
        <scope>provided</scope>
    </dependency>
</dependencies><br>

Once compiled the target jar, only pick non-dependencies to execute, and specify path of dependent jars in command.

--jars /usr/hdp/current/spark-client/lib/spark-assembly-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar,/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.6.1.0-129.jar,/usr/hdp/current/spark-client/lib/spark-examples-1.6.3.2.6.1.0-129-hadoop2.7.3.2.6.1.0-129.jar \
--class spark.example.JavaDirectKafkaWordCount kafka-producer-1.0-SNAPSHOT.jar

**Use Kafka tool to test
After starting the above streaming process, meanwhile initiate an producer using

[root@local0 bin]# ./kafka-console-producer.sh --broker-list local0:6667 --topic kerb--security-protocol PLAINTEXTSASL
Test content..

Check the output log generated from SparkStreaming process, either from yarn or spark history server.

o/p #Time: 1506791200000 ms
(Kafka,1) (2.6,1) (HDP,1) (Other,1) (,1) (Connecting,1) (Kerberized,1) (Notable,1) (Issue,1) (in,1) …