-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-7784] [kafka-producer] Don't fail TwoPhaseCommitSinkFunction when failing to commit during job recovery #4910
Conversation
eed8fad
to
6424784
Compare
The changes look very good! @pnowojski could you please also have a look at this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One remark for the future. Please try to not mix refactors with real code changes. It is more difficult to review things - for example it is unclear at the first glance whether anything has changed in testFailBeforeNotify
.
// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1): | ||
// The KafkaProducer may not throw an exception if the transaction failed to commit | ||
if (semantic == Semantic.EXACTLY_ONCE) { | ||
final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Don't we have somewhere implemented similar/same parsing/reading numeric logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't find any. I thought about using the parsing logic from Kafka but it is package private.
* attempt at least one commit of the transaction before giving up.</p> | ||
*/ | ||
@Override | ||
public FlinkKafkaProducer011<IN> disableFailurePropagationAfterTransactionTimeout() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please move this method to the top, somewhere around setLogFailuresOnly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Moved it.
* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of | ||
* propagated. | ||
*/ | ||
private boolean failureOnCommitAfterTransactionTimeoutDisabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to propagateTransactionTimeouts
or ignoreTransactionTimeouts
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename to failurePropagationAfterTransactionTimeoutDisabled
as this name is closer to the method name that controls this field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of my concern was that the name is quite long (same applies for the methods). Maybe ignoreFailuresAfterTimeout
? Or ignoreFailuresAfterTransactionTimeout
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I like ignoreFailuresAfterTransactionTimeout
. The method name is now the same, though.
* If a transaction's elapsed time reaches this percentage of the transactionTimeout, a warning | ||
* message will be logged. Value must be in range [0,1]. Negative value disables warnings. | ||
*/ | ||
private double transactionTimeoutWarningRatio = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to implement this sanity check in FlinkKafkaProducer011
, since it's a walk around Kafka's bug and unlikely to be useful in general case. And after Kafka 0.11.0.2
or 1.0.0
release we won't need this code anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If exceeding the transactionTimeout
can result in data loss, I prefer the feature here (i.e., TwoPhaseCommitSinkFunction
). You may want to be warned before something bad happens so that parameters can be tuned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced, since this will make our public api larger and more difficult to maintain. However if @aljoscha is ok with that I will yield ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with @pnowojski about the API surface but I think in this case this is a valid safety net for possible future transaction sinks.
* This method must be the only place to call {@link #beginTransaction()} to ensure that the | ||
* {@link TransactionHolder} is created at the same time. | ||
*/ | ||
private TransactionHolder<TXN> beginTransaction0() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this 0
in method's name doesn't help in anything. Maybe rename it to either beginTransactionHolder
, beginTransactionWrapper
, beginTransactionInternal
, beginTransactionAndStartTimeoutTimer
or beginTransactionAndMarkTime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename.
* the configuration parameters {@link #transactionTimeout} and | ||
* {@link #failureOnCommitAfterTransactionTimeoutDisabled} are respected. | ||
*/ | ||
private void recoverAndCommit(TransactionHolder<TXN> transactionHolder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: overloading adds confusion, because it suggests that both methods (recoverAndCommit(TXN)
and recoverAndCommit(TransactionHolder)
) are equally valid and could be used interchangeably.
As above, maybe rename to recoverAndCommitHolder
, recoverAndCommitWrapper
, recoverAndCommitInternal
, recoverCommitAndHandleTimeout
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename.
} catch (final Exception e) { | ||
final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime; | ||
if (failureOnCommitAfterTransactionTimeoutDisabled && elapsedTime > transactionTimeout) { | ||
log.error("Error while committing transaction {}. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add explicit warning about a data loss? "Error while committing transaction {}. Data loss might occurred"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good. Will add a statement to the log message.
* Adds metadata (currently only the start time of the transaction) to the transaction object. | ||
*/ | ||
@VisibleForTesting | ||
static class TransactionHolder<TXN> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: TransactionWrapper
? - if you prefer Holder
it can be as it is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave it as is. It's an internal class and it is clear what it does.
@@ -194,29 +348,4 @@ public String toString() { | |||
return String.format("FileTransaction[%s]", tmpFile.getName()); | |||
} | |||
} | |||
|
|||
private static class TestContext implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove this encapsulation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using
@Rule
public TemporaryFolder folder = new TemporaryFolder();
to create test folders are files which are cleaned up automatically. Since the rule must be a public field in the test, it would have required more work to keep the TestContext
(e.g., new constructor arguments would be needed to pass the files).
FileBasedSinkFunction
is also not static anymore so that it can access the fields of the test directly. I wanted to avoid creating more constructor arguments. Let me know if this is problematic.
private Clock mockClock; | ||
|
||
@Mock | ||
private Logger mockLogger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will all of those mockLogger
tests fail, if you replace:
log.warn("Transaction {} has been open for {} ms. " +
"This is close to or even exceeding the transaction timeout of {} ms.",
transactionHolder.handle,
elapsedTime,
transactionTimeout);
with
log.warn("Transaction {} has been open for too long. " +
"This is close to or even exceeding the transaction timeout of {} ms.",
transactionHolder.handle,
transactionTimeout);
or
log.warn(String.format("Transaction {} has been open for {} ms. " +
"This is close to or even exceeding the transaction timeout of {} ms.",
transactionHolder.handle,
elapsedTime,
transactionTimeout));
?
If so, then this is a perfect example my I consider mockito to be the definition of evil. Using mockito in tests in 99% cases is duplicating/copying productional code into tests, basically repeating the implementation, which is super fragile and brakes on even the tiniest refactors. While good tests instead should check/assert the actual effects - not whether specific method calls were called and how many times they were called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is asserting on the presence of the substring This is close to or even exceeding the transaction timeout
in the log message. All your changes to the code would still pass the test except for the 2nd case because there is an assert on the elapsed time. Mockito is used here so that the argument passed to .warn
can be captured. Imo this is not evil as no behaviour is mocked, and actual effects are tested. In my case, logging a message if the transaction is too old is the only effect (see for example testLogTimeoutAlmostReachedWarningDuringCommit
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure? Logger.warn(String, Object...)
, Logger.warn(String, Object)
, Logger.warn(String, Object, Object)
and Logger.warn(String)
are different methods.
This shows, that you are not testing for the effects (warning message being logged somewhere), but you test whether one particular method was called or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, you are right. One would need to implement the org.slf4j.Logger
interface to do it right.
- http://projects.lidalia.org.uk/slf4j-test/ provides a
org.slf4j.Logger
implementation for unit testing but this approach comes with other problems (e.g. classpathDependencyExcludes are not picked up by IntelliJ Mahoney/slf4j-test#15, NOP if log level is disabled
Right now I don't see a way to do it properly. Any help is welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test does not rely on any mocks anymore. I am not 100% happy with it because we use log4j 1.x
which is not maintained anymore and in log4j 2.x
, the APIs have changed a lot: http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent
I propose to leave it as is for now.
private File tmpDirectory; | ||
|
||
@Mock | ||
private Clock mockClock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not use mockito for classes that are so easy to implement and mock in old fashion way (creating SettableClock
seems to be super easy).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you that overusing mocks is an anti-pattern. Changed it to a real implementation. Please have a look. I am not sure if this is better now because after all, the only mocked function on Clock was .millis()
.
@@ -83,49 +79,6 @@ public void before() { | |||
extraProperties.put("isolation.level", "read_committed"); | |||
} | |||
|
|||
@Test(timeout = 30000L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these removed? Did they never actually test anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests that I removed were already in FlinkKafkaProducerTests
. Probably some copy paste error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's right. Those tests are also in FlinkKafkaProducerTest
.
7909b28
to
ecff707
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…e of transaction timeouts. TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The timeout can be used to log warnings if the transaction's age is appraoching the timeout, and it can be used to swallow exceptions that are likely irrecoverable. This commit also integrates these changes to the FlinkKafkaProducer011.
LGTM, thanks for the work @GJL. Merging this .. |
What is the purpose of the change
This makes it possible to configure the TwoPhaseCommitSinkFunction's behaviour w.r.t. transaction timeouts.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation