Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka producer and consumer not working in TLS enabled cluster #427

Closed
kumar-b opened this issue Aug 4, 2016 · 14 comments
Closed

Kafka producer and consumer not working in TLS enabled cluster #427

kumar-b opened this issue Aug 4, 2016 · 14 comments
Labels
Milestone

Comments

@kumar-b
Copy link

kumar-b commented Aug 4, 2016

The 3 node kafka cluster is setup with TLS with inter broker communication protocol as SSL. I have created a test topic with 3 partitions and 3 replication factor. The kafka-node producer is able to send message to one of partition, but gives error for other two. Similarly kafka-node consumer is not getting any messages from two of the brokers. kafka console producer and consumers are working fine with the setup.
Please note that kafka-node producer and consumer are working fie with PLAINTEXT kafka listeners.

I am using following code to produce a test message -

var fs = require('fs');
var kafka = require('kafka-node');

var partition = process.argv[2] || 0;
var Client = kafka.Client;
var options = {
    pfx: fs.readFileSync('<PKCS_FILE>'),
    passphrase: '<PASSPHRASE>',
    requestCert: true,
    rejectUnauthorized: false
};

var kafkaClient = new Client('<ZK_SYSTEM>:<ZK_PORT>', 'kafka-node', {}, {}, options);
var producer;
var Producer = kafka.Producer;

function createProducer() {
    producer = new Producer(kafkaClient);
    producer.on('error', function (err) {
        throw new Error(err);
    });

    producer.on('ready', function () {
        console.log("producer is ready !!!");
        console.log('sending data to partition- ' + partition);
        producer.send([
            {topic: 'test3', messages: 'message', partition: partition}
        ], function (err, result) {
            if (err) {
                throw new Error(err);
            }
            console.log(err, result);
        });
    });
}

createProducer();

It gives following error for partition 0 and 1.

$ node ./producer.js 0
producer is ready !!!
sending data to partition- 0
/opt/nodetests/kafka-ssl/producer.js:34
                        throw new Error(err);
                        ^

Error: Error: UnknownTopicOrPartition
    at /opt/nodetests/kafka-ssl/producer.js:34:10
    at /opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:194:17
    at null.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:244:31)
    at Client.handleReceivedData (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:622:18)
    at TLSSocket.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:594:14)
    at emitOne (events.js:77:13)
    at TLSSocket.emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:153:18)
    at TLSSocket.Readable.push (_stream_readable.js:111:10)
    at TLSWrap.onread (net.js:531:20)

For partition 2, it is able to send the message with no error.

$ node ./producer.js 2
producer is ready !!!
sending data to partition- 2
null { test3: { '2': 6 } }

Sometimes with some other topic, we are getting following errors (with stack).

$ node ./producer.js
producer is ready !!!
Error: NotLeaderForPartition
    at Object.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/protocol/protocol.js:417:29)
    at Object.self.tap (/opt/nodetests/kafka-ssl/node_modules/kafka-node/node_modules/binary/index.js:248:12)
    at Object.decodePartitions (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/protocol/protocol.js:415:14)
    at Object.self.loop (/opt/nodetests/kafka-ssl/node_modules/kafka-node/node_modules/binary/index.js:267:16)
    at Object.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/protocol/protocol.js:60:14)
    at Object.self.loop (/opt/nodetests/kafka-ssl/node_modules/kafka-node/node_modules/binary/index.js:267:16)
    at decodeProduceResponse (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/protocol/protocol.js:407:10)
    at Client.handleReceivedData (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:620:22)
    at TLSSocket.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:594:14)
    at emitOne (events.js:77:13)
    at TLSSocket.emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:153:18)
    at TLSSocket.Readable.push (_stream_readable.js:111:10)
    at TLSWrap.onread (net.js:531:20)
/opt/nodetests/kafka-ssl/producer.js:44
                if (err) throw new Error(err);
                         ^

Error: Error: NotLeaderForPartition
    at /opt/nodetests/kafka-ssl/producer.js:44:18
    at /opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:194:17
    at null.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:244:31)
    at Client.handleReceivedData (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:622:18)
    at TLSSocket.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:594:14)
    at emitOne (events.js:77:13)
    at TLSSocket.emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:153:18)
    at TLSSocket.Readable.push (_stream_readable.js:111:10)
    at TLSWrap.onread (net.js:531:20)
@kumar-b kumar-b changed the title Kafka producer and cosnumer not working in TLS enabled cluster Kafka producer and consumer not working in TLS enabled cluster Aug 4, 2016
@kumar-b
Copy link
Author

kumar-b commented Aug 4, 2016

Also no issues noticed with kafka single machine setup with TLS.

@kumar-b kumar-b mentioned this issue Aug 4, 2016
@hyperlink
Copy link
Collaborator

Thanks for the report. I will look into this.

@hyperlink
Copy link
Collaborator

@kumar-b could you attach your producer send block to connect event of your kafkaClient instead of ready and let me know how that works for you.

@kumar-b
Copy link
Author

kumar-b commented Aug 4, 2016

It gave same error as reported above.

@hyperlink
Copy link
Collaborator

Hmm are the pfx and passphrase required? We don't use them to connect to our cluster. Also what version of kafka are you running?

@kumar-b
Copy link
Author

kumar-b commented Aug 4, 2016

pfx file contains the key and cert of the client and it is password protected. Since the client is able to connect to other brokers, pfx part may not be the cause. We are running kafka 0.10

@hyperlink
Copy link
Collaborator

Try calling kafkaClient.refreshMetadata(topic, callback) before you send. You can also echo out the contents of kafkaClient.topicMetadata that may provide more information.

@kumar-b
Copy link
Author

kumar-b commented Aug 5, 2016

Refreshed metadata before call and the topic metadata for three different run is echoed out.

$ node ./producer-connect.js 1 test3
sending data to topic- test3
sending data to partition- 1
topic metadata: {"test3":{"0":{"topic":"test3","partition":0,"leader":2,"replicas":[2],"isr":[2]},"1":{"topic":"test3","partition":1,"leader":3,"replicas":[3],"isr":[3]},"2":{"topic":"test3","partition":2,"leader":1,"replicas":[1],"isr":[1]}}}
producer is ready !!!
sending data to partition- 1
/opt/nodetests/kafka-ssl/producer-connect.js:44
                                throw new Error(err);
                                ^

Error: Error: UnknownTopicOrPartition
    at /opt/nodetests/kafka-ssl/producer-connect.js:44:11
    at /opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:194:17
    at null.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:244:31)
    at Client.handleReceivedData (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:622:18)
    at TLSSocket.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:594:14)
    at emitOne (events.js:77:13)
    at TLSSocket.emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:153:18)
    at TLSSocket.Readable.push (_stream_readable.js:111:10)
    at TLSWrap.onread (net.js:531:20)
$ node ./producer-connect.js 0 test3
sending data to topic- test3
sending data to partition- 0
topic metadata: {"test3":{"0":{"topic":"test3","partition":0,"leader":2,"replicas":[2],"isr":[2]},"1":{"topic":"test3","partition":1,"leader":3,"replicas":[3],"isr":[3]},"2":{"topic":"test3","partition":2,"leader":1,"replicas":[1],"isr":[1]}}}
producer is ready !!!
sending data to partition- 0
/opt/nodetests/kafka-ssl/producer-connect.js:44
                                throw new Error(err);
                                ^

Error: Error: UnknownTopicOrPartition
    at /opt/nodetests/kafka-ssl/producer-connect.js:44:11
    at /opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:194:17
    at null.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:244:31)
    at Client.handleReceivedData (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:622:18)
    at TLSSocket.<anonymous> (/opt/nodetests/kafka-ssl/node_modules/kafka-node/lib/client.js:594:14)
    at emitOne (events.js:77:13)
    at TLSSocket.emit (events.js:169:7)
    at readableAddChunk (_stream_readable.js:153:18)
    at TLSSocket.Readable.push (_stream_readable.js:111:10)
    at TLSWrap.onread (net.js:531:20)
$ node ./producer-connect.js 2 test3
sending data to topic- test3
sending data to partition- 2
topic metadata: {"test3":{"0":{"topic":"test3","partition":0,"leader":2,"replicas":[2],"isr":[2]},"1":{"topic":"test3","partition":1,"leader":3,"replicas":[3],"isr":[3]},"2":{"topic":"test3","partition":2,"leader":1,"replicas":[1],"isr":[1]}}}
producer is ready !!!
sending data to partition- 2
null { test3: { '2': 8 } }

@kumar-b
Copy link
Author

kumar-b commented Aug 5, 2016

the above run was with topic having 1 replication factor and 3 partitions. But same behavior was observed with 3 partitions and 3 replication factor.

@kumar-b
Copy link
Author

kumar-b commented Aug 10, 2016

Hi @hyperlink

With some debug, I found out the root cause.

Since from kafka 0.9 the host and port property were deprecated and listeners was used as single point of truth, we have setup kafka brokers without host and port. So metadata of a broker looks like this way

//  metadata for leader- 1 
{"jmx_port":-1,"timestamp":"1470139709693","endpoints":["SSL://<HOST1>:<PORT1>"],"host":null,"version":3,"port":-1,"sslHost":"<HOST1>","sslPort":<PORT1>}

Here host and port are null and -1. In a cluster all the brokers will have host and port as null and -1.
But in kafka-node after connect, it gets the broker metadata, extracts the host and port and stores as key value pair in self.brokers.
https://github.com/SOHU-Co/kafka-node/blob/master/lib/client.js#L88

var broker = brokers[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);

Since all brokers has same null and -1 as host and port, it will be overwritten. Then Client.prototype.brokerForLeader gives the wrong broker information. So producer and consumer fails.
https://github.com/SOHU-Co/kafka-node/blob/master/lib/client.js#L525

I hope the analysis will be helpful in fixing the issue. Also please analyze the impact of the same on other parts of the module.

@kumar-b
Copy link
Author

kumar-b commented Aug 10, 2016

Actually this is not at all any configuration issue. host and port will be null and -1 for SSL mode and sslHost, sslPort will have real values. So setup brokers part should be changed based on self.ssl

@hyperlink hyperlink added the bug label Aug 10, 2016
@hyperlink hyperlink added this to the 0.5.5 milestone Aug 10, 2016
@hyperlink
Copy link
Collaborator

Great debugging @kumar-b. I hope to have this fixed soon.

@hyperlink
Copy link
Collaborator

@kumar-b could you temporarily point your package.json to point to the fix-ssl-only-clusters branch and see if this issue is resolved for you?

@kumar-b
Copy link
Author

kumar-b commented Aug 11, 2016

It works with positive tests. I didn't try other negative tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants