Subscription: declare the referenced code for timer in license & fixup committable logic for ack (#13795)

This commit is contained in:
V_Galaxy 2024-10-17 10:31:32 +08:00 committed by GitHub
parent e20680cf05
commit 4d8d1c309f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 9 deletions

View File

@ -267,3 +267,11 @@ which is under Apache License 2.0:
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
./iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/*
--------------------------------------------------------------------------------
The following files include code modified from Apache Kafka project.
./iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/PollTimer.java
Project page: https://github.com/apache/kafka
License: http://www.apache.org/licenses/LICENSE-2.0

View File

@ -423,15 +423,6 @@ public abstract class SubscriptionPrefetchingQueue {
return null;
}
if (ev.isCommitted()) {
LOGGER.warn(
"Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}",
ev,
commitContext,
this);
return ev;
}
if (ev.isCommitted()) {
LOGGER.warn(
"Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}",
@ -443,6 +434,15 @@ public abstract class SubscriptionPrefetchingQueue {
return null; // remove this entry
}
if (!ev.isCommittable()) {
LOGGER.warn(
"Subscription: subscription event {} is not committable, subscription commit context {}, prefetching queue: {}",
ev,
commitContext,
this);
return ev;
}
// check if a consumer acks event from another consumer group...
final String consumerGroupId = commitContext.getConsumerGroupId();
if (!Objects.equals(consumerGroupId, brokerId)) {