-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
I'm using this library in the end of message callback of a milter application. I've got a JSON object of the email message, and I need to hand it off to Kafka before I delete it from the Postfix queue. Here is the code:
rk=rd_kafka_new(RD_KAFKA_PRODUCER, kafka_url, NULL);
if(rk != NULL) {
rd_kafka_produce(rk,"Trout-test",0,RD_KAFKA_OP_F_FREE,scratch,strlen(scratch));
fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - Sent %d bytes to Trout-test:0\n",(long int)syscall(SYS_gettid),time(NULL),strlen(scratch));
while (rd_kafka_outq_len(rk) > 0) { /* Pulled this from the rdkafka_example */
usleep(50000);
}
fprintf(logfile,"TID: %ld :: %ld :: scmfi_eom - kafka outq is now 0\n",(long int)syscall(SYS_gettid),time(NULL));
//free(scratch); /* the rd_kafka_produce call is freeing scratch (RD_KAFKA_OP_F_FREE) */
usleep(500000);
rd_kafka_destroy(rk); /* Destroy the Kafka handle */
}
When I run this code, everything works fine, until I've sent in about 1000 messages through the MTA. At that point, the rd_kafka_new started to fail with this message: Failed to create inet socket: Too many open files
So I upped my open files with ulimit to a number greater than 200000 (I was sending in batches of 100000 messages), and then it started failing at around 30000 messages because there were no more ephemeral sockets available to make connections to the broker.
When I look at the source, I see the close call on the socket, but when I follow the execution with lsof, or just netstat, the sockets are all established. Am I using the rd_kafka_new, rd_kafka_produce, rd_kafka_destroy improperly (once for each message), or is this an actual problem?
Thank you,
Paul