改为手动提交
This commit is contained in:
parent
4eea6929d7
commit
22fcd5e651
2
app.py
2
app.py
@ -79,7 +79,7 @@ class XProcess(Process):
|
|||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
transmitter.run()
|
transmitter.run(kafka_client)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
@ -130,7 +130,7 @@ class Transmitter:
|
|||||||
for key in del_keys:
|
for key in del_keys:
|
||||||
del item[key]
|
del item[key]
|
||||||
|
|
||||||
def run(self):
|
def run(self, kafka_client):
|
||||||
for tb, buffer in self.check_send():
|
for tb, buffer in self.check_send():
|
||||||
try:
|
try:
|
||||||
data = [self.flat_data(x) for x in buffer.values()]
|
data = [self.flat_data(x) for x in buffer.values()]
|
||||||
@ -141,3 +141,7 @@ class Transmitter:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(traceback.format_exc())
|
self.log.error(traceback.format_exc())
|
||||||
buffer.clear()
|
buffer.clear()
|
||||||
|
try:
|
||||||
|
kafka_client.commit()
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(e)
|
||||||
|
Loading…
Reference in New Issue
Block a user