Skip to content

Pipe: Fixed the event clear logic of drop pipe#17560

Open
Caideyipi wants to merge 12 commits intomasterfrom
drop-pipe-fix
Open

Pipe: Fixed the event clear logic of drop pipe#17560
Caideyipi wants to merge 12 commits intomasterfrom
drop-pipe-fix

Conversation

@Caideyipi
Copy link
Copy Markdown
Collaborator

@Caideyipi Caideyipi commented Apr 27, 2026

Description

This PR has guaranteed that: No more newly generated event will be sent after a successful pipe drop.
Contents:

  1. Banned any events from the retry-queue(asyncSink) and pending-queue(sinkSubtask)
  2. Cleared the pending queue in web socket sink
  3. Added creation time in pipe clear check

Test

Reuse pipe (empty + data) -> kill receiver -> drop data pipe
img_v3_02115_bb9de11d-1c8f-495f-b519-a9ad330e991g
img_v3_02115_a3e793af-7959-413c-8bc4-6a99d76cc69g


This PR has:

  • been self-reviewed.
    • concurrent read
    • concurrent write
    • concurrent read and write
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods.
  • added or updated version, license, or notice information
  • added comments explaining the "why" and the intent of the code wherever would not be obvious
    for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold
    for code coverage.
  • added integration tests.
  • been tested in a test IoTDB cluster.

Key changed/added classes (or packages if there are too many classes) in this PR

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 27, 2026

Codecov Report

❌ Patch coverage is 66.04938% with 55 lines in your changes missing coverage. Please review.
✅ Project coverage is 40.12%. Comparing base (ce84224) to head (f0f416b).
⚠️ Report is 9 commits behind head on master.

Files with missing lines Patch % Lines
...k/protocol/websocket/WebSocketConnectorServer.java 66.17% 23 Missing ⚠️
...pe/agent/task/connection/BlockingPendingQueue.java 69.56% 7 Missing ⚠️
...rotocol/thrift/async/IoTDBDataRegionAsyncSink.java 70.00% 6 Missing ⚠️
...db/pipe/sink/protocol/websocket/WebSocketSink.java 0.00% 4 Missing ⚠️
...pache/iotdb/commons/pipe/datastructure/Triple.java 73.33% 4 Missing ⚠️
.../payload/evolvable/batch/PipeTabletEventBatch.java 0.00% 3 Missing ⚠️
...ubtask/sink/PipeRealtimePriorityBlockingQueue.java 50.00% 2 Missing ⚠️
.../pipe/agent/task/subtask/sink/PipeSinkSubtask.java 66.66% 2 Missing ⚠️
...nt/task/subtask/sink/PipeSinkSubtaskLifeCycle.java 0.00% 1 Missing ⚠️
...gent/task/subtask/sink/PipeSinkSubtaskManager.java 0.00% 1 Missing ⚠️
... and 2 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17560      +/-   ##
============================================
+ Coverage     39.85%   40.12%   +0.26%     
  Complexity     2547     2547              
============================================
  Files          5171     5174       +3     
  Lines        348174   348449     +275     
  Branches      44489    44549      +60     
============================================
+ Hits         138767   139811    +1044     
+ Misses       209407   208638     -769     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@sonarqubecloud
Copy link
Copy Markdown

Comment on lines +131 to +132
private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to explain what the three are.

Comment on lines 369 to +373
synchronized (queue) {
queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
}

queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why put twice?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file used?

Comment on lines +184 to +201
protected static boolean isEventFromPipe(
final EnrichedEvent event,
final String pipeNameToDrop,
final long creationTimeToDrop,
final int regionId) {
return pipeNameToDrop.equals(event.getPipeName())
&& creationTimeToDrop == event.getCreationTime()
&& regionId == event.getRegionId();
}

protected boolean isEventFromDroppedPipe(final E event) {
return event instanceof EnrichedEvent
&& ((EnrichedEvent) event).getPipeName() != null
&& isPipeDropped(
((EnrichedEvent) event).getPipeName(),
((EnrichedEvent) event).getCreationTime(),
((EnrichedEvent) event).getRegionId());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be many similar methods.
Is it possible to do some extraction?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants