apache kafka - Is there any workaround for simpleConsumer module to read only the new messages? -
as mentioned simple consumer here
https://cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example
also note explicitly checking offset being read not less offset requested. needed since if kafka compressing messages, fetch request return entire compressed block if requested offset isn't beginning of compressed block. message saw may returned again.
finally, keep track of # of messages read. if didn't read on last request go sleep second aren't hammering kafka when there no data.
as in program reads 1 of old messages first, goes sleep old, , reads new records.
any work around simpleconsumer reads new messages?
from same page
public static long getlastoffset(simpleconsumer consumer, string topic, int partition, long whichtime, string clientname) { topicandpartition topicandpartition = new topicandpartition(topic, partition); map<topicandpartition, partitionoffsetrequestinfo> requestinfo = new hashmap<topicandpartition, partitionoffsetrequestinfo>(); requestinfo.put(topicandpartition, new partitionoffsetrequestinfo(whichtime, 1)); kafka.javaapi.offsetrequest request = new kafka.javaapi.offsetrequest(requestinfo, kafka.api.offsetrequest.currentversion(),clientname); offsetresponse response = consumer.getoffsetsbefore(request); if (response.haserror()) { system.out.println("error fetching data offset data broker. reason: " + response.errorcode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; }
it says finding offset read
kafka includes 2 constants help, kafka.api.offsetrequest.earliesttime() finds beginning of data in logs , starts streaming there, kafka.api.offsetrequest.latesttime() stream new messages. don’t assume offset 0 beginning offset, since messages age out of log on time.
Comments
Post a Comment