Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7784] [kafka-producer] Don't fail TwoPhaseCommitSinkFunction when failing to commit during job recovery #4910

Closed
wants to merge 2 commits into from

Conversation

GJL
Copy link
Member

@GJL GJL commented Oct 26, 2017

What is the purpose of the change

This makes it possible to configure the TwoPhaseCommitSinkFunction's behaviour w.r.t. transaction timeouts.

Brief change log

  • Introduce transaction timeouts to TwoPhaseCommitSinkFunction.
  • Timeout can be used to generate warnings if the transaction's age approaches the timeout.
  • If an exception is thrown during job recovery, the sink can be configured not to propagate the exception and instead log it on ERROR level.

Verifying this change

This change added tests and can be verified as follows:

  • Extended unit tests for TwoPhaseCommitSinkFunction to test added functionality
  • Manually verified the change by running a job with a FlinkKafka011Producer with checkpoint interval 27000 and transaction.timeout.ms = 30000. Warnings were generated correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@GJL
Copy link
Member Author

GJL commented Oct 26, 2017

@aljoscha @pnowojski

@GJL GJL force-pushed the FLINK-7784 branch 2 times, most recently from eed8fad to 6424784 Compare October 27, 2017 13:15
@aljoscha
Copy link
Contributor

The changes look very good! @pnowojski could you please also have a look at this?

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One remark for the future. Please try to not mix refactors with real code changes. It is more difficult to review things - for example it is unclear at the first glance whether anything has changed in testFailBeforeNotify .

// See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
// The KafkaProducer may not throw an exception if the transaction failed to commit
if (semantic == Semantic.EXACTLY_ONCE) {
final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Don't we have somewhere implemented similar/same parsing/reading numeric logic?

Copy link
Member Author

@GJL GJL Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't find any. I thought about using the parsing logic from Kafka but it is package private.

* attempt at least one commit of the transaction before giving up.</p>
*/
@Override
public FlinkKafkaProducer011<IN> disableFailurePropagationAfterTransactionTimeout() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please move this method to the top, somewhere around setLogFailuresOnly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Moved it.

* If true, any exception thrown in {@link #recoverAndCommit(Object)} will be caught instead of
* propagated.
*/
private boolean failureOnCommitAfterTransactionTimeoutDisabled;
Copy link
Contributor

@pnowojski pnowojski Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to propagateTransactionTimeouts or ignoreTransactionTimeouts?

Copy link
Member Author

@GJL GJL Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename to failurePropagationAfterTransactionTimeoutDisabled as this name is closer to the method name that controls this field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of my concern was that the name is quite long (same applies for the methods). Maybe ignoreFailuresAfterTimeout? Or ignoreFailuresAfterTransactionTimeout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I like ignoreFailuresAfterTransactionTimeout. The method name is now the same, though.

* If a transaction's elapsed time reaches this percentage of the transactionTimeout, a warning
* message will be logged. Value must be in range [0,1]. Negative value disables warnings.
*/
private double transactionTimeoutWarningRatio = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to implement this sanity check in FlinkKafkaProducer011, since it's a walk around Kafka's bug and unlikely to be useful in general case. And after Kafka 0.11.0.2 or 1.0.0 release we won't need this code anymore.

Copy link
Member Author

@GJL GJL Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If exceeding the transactionTimeout can result in data loss, I prefer the feature here (i.e., TwoPhaseCommitSinkFunction). You may want to be warned before something bad happens so that parameters can be tuned.

Copy link
Contributor

@pnowojski pnowojski Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced, since this will make our public api larger and more difficult to maintain. However if @aljoscha is ok with that I will yield ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with @pnowojski about the API surface but I think in this case this is a valid safety net for possible future transaction sinks.

* This method must be the only place to call {@link #beginTransaction()} to ensure that the
* {@link TransactionHolder} is created at the same time.
*/
private TransactionHolder<TXN> beginTransaction0() throws Exception {
Copy link
Contributor

@pnowojski pnowojski Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 0 in method's name doesn't help in anything. Maybe rename it to either beginTransactionHolder, beginTransactionWrapper, beginTransactionInternal, beginTransactionAndStartTimeoutTimer or beginTransactionAndMarkTime?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename.

* the configuration parameters {@link #transactionTimeout} and
* {@link #failureOnCommitAfterTransactionTimeoutDisabled} are respected.
*/
private void recoverAndCommit(TransactionHolder<TXN> transactionHolder) {
Copy link
Contributor

@pnowojski pnowojski Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: overloading adds confusion, because it suggests that both methods (recoverAndCommit(TXN) and recoverAndCommit(TransactionHolder)) are equally valid and could be used interchangeably.

As above, maybe rename to recoverAndCommitHolder, recoverAndCommitWrapper, recoverAndCommitInternal, recoverCommitAndHandleTimeout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename.

} catch (final Exception e) {
final long elapsedTime = clock.millis() - transactionHolder.transactionStartTime;
if (failureOnCommitAfterTransactionTimeoutDisabled && elapsedTime > transactionTimeout) {
log.error("Error while committing transaction {}. " +
Copy link
Contributor

@pnowojski pnowojski Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add explicit warning about a data loss? "Error while committing transaction {}. Data loss might occurred"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good. Will add a statement to the log message.

* Adds metadata (currently only the start time of the transaction) to the transaction object.
*/
@VisibleForTesting
static class TransactionHolder<TXN> {
Copy link
Contributor

@pnowojski pnowojski Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: TransactionWrapper? - if you prefer Holder it can be as it is

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave it as is. It's an internal class and it is clear what it does.

@@ -194,29 +348,4 @@ public String toString() {
return String.format("FileTransaction[%s]", tmpFile.getName());
}
}

private static class TestContext implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this encapsulation?

Copy link
Member Author

@GJL GJL Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using

	@Rule
	public TemporaryFolder folder = new TemporaryFolder();

to create test folders are files which are cleaned up automatically. Since the rule must be a public field in the test, it would have required more work to keep the TestContext (e.g., new constructor arguments would be needed to pass the files).

FileBasedSinkFunction is also not static anymore so that it can access the fields of the test directly. I wanted to avoid creating more constructor arguments. Let me know if this is problematic.

private Clock mockClock;

@Mock
private Logger mockLogger;
Copy link
Contributor

@pnowojski pnowojski Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will all of those mockLogger tests fail, if you replace:

			log.warn("Transaction {} has been open for {} ms. " +
					"This is close to or even exceeding the transaction timeout of {} ms.",
				transactionHolder.handle,
				elapsedTime,
				transactionTimeout);

with

			log.warn("Transaction {} has been open for too long. " +
					"This is close to or even exceeding the transaction timeout of {} ms.",
				transactionHolder.handle,
				transactionTimeout);

or

			log.warn(String.format("Transaction {} has been open for {} ms. " +
					"This is close to or even exceeding the transaction timeout of {} ms.",
				transactionHolder.handle,
				elapsedTime,
				transactionTimeout));

?
If so, then this is a perfect example my I consider mockito to be the definition of evil. Using mockito in tests in 99% cases is duplicating/copying productional code into tests, basically repeating the implementation, which is super fragile and brakes on even the tiniest refactors. While good tests instead should check/assert the actual effects - not whether specific method calls were called and how many times they were called.

Copy link
Member Author

@GJL GJL Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is asserting on the presence of the substring This is close to or even exceeding the transaction timeout in the log message. All your changes to the code would still pass the test except for the 2nd case because there is an assert on the elapsed time. Mockito is used here so that the argument passed to .warn can be captured. Imo this is not evil as no behaviour is mocked, and actual effects are tested. In my case, logging a message if the transaction is too old is the only effect (see for example testLogTimeoutAlmostReachedWarningDuringCommit).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure? Logger.warn(String, Object...), Logger.warn(String, Object), Logger.warn(String, Object, Object) and Logger.warn(String) are different methods.

This shows, that you are not testing for the effects (warning message being logged somewhere), but you test whether one particular method was called or not.

Copy link
Member Author

@GJL GJL Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, you are right. One would need to implement the org.slf4j.Logger interface to do it right.

Right now I don't see a way to do it properly. Any help is welcome.

Copy link
Member Author

@GJL GJL Oct 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test does not rely on any mocks anymore. I am not 100% happy with it because we use log4j 1.x which is not maintained anymore and in log4j 2.x, the APIs have changed a lot: http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent

I propose to leave it as is for now.

private File tmpDirectory;

@Mock
private Clock mockClock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not use mockito for classes that are so easy to implement and mock in old fashion way (creating SettableClock seems to be super easy).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that overusing mocks is an anti-pattern. Changed it to a real implementation. Please have a look. I am not sure if this is better now because after all, the only mocked function on Clock was .millis().

@@ -83,49 +79,6 @@ public void before() {
extraProperties.put("isolation.level", "read_committed");
}

@Test(timeout = 30000L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these removed? Did they never actually test anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests that I removed were already in FlinkKafkaProducerTests. Probably some copy paste error.

Copy link
Contributor

@pnowojski pnowojski Oct 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right. Those tests are also in FlinkKafkaProducerTest.

@GJL GJL force-pushed the FLINK-7784 branch 5 times, most recently from 7909b28 to ecff707 Compare October 31, 2017 09:31
Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

GJL added 2 commits November 1, 2017 11:17
…e of transaction timeouts.

TwoPhaseCommitSinkFunction allows to configure a transaction timeout. The
timeout can be used to log warnings if the transaction's age is appraoching
the timeout, and it can be used to swallow exceptions that are likely
irrecoverable. This commit also integrates these changes to the
FlinkKafkaProducer011.
@tzulitai
Copy link
Contributor

tzulitai commented Nov 2, 2017

LGTM, thanks for the work @GJL. Merging this ..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants