procedure TKafkaConsumerThread.DoSetup;
i: Integer;
TopicList: prd_kafka_topic_partition_list_t;
err: rd_kafka_resp_err_t;
begin
FKafkaHandle := TKafkaHelper.NewConsumer(FConfiguration);
if rd_kafka_brokers_add(FKafkaHandle, PAnsiChar(AnsiString(FBrokers))) = 0 then
begin
raise EKafkaError.Create(StrBrokersCouldNotBe);
rd_kafka_poll_set_consumer(FKafkaHandle);
TopicList := rd_kafka_topic_partition_list_new(0);
for i := Low(FTopics) to High(FTopics) do
begin
rd_kafka_topic_partition_list_add(
TopicList,
PAnsiChar(AnsiString(FTopics[i])), // 'test' works, '^*' does not
FPartitions[i]);