Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle expiration of transactions between operator checkpoint and global checkpoint commit #5

Open
StephanEwen opened this issue May 4, 2017 · 2 comments

Comments

@StephanEwen
Copy link
Contributor

StephanEwen commented May 4, 2017

Problem description

The FlinkExactlyOncePravegaWriter works similar to a 2-PC protocol:

  1. A transaction is created per checkpoint
  2. When Flink does a checkpoint on the writer, it flushes the transaction, and stores its UUID in the checkpoint state (vote-to-commit)
  3. When the checkpoint is complete and the sink receives a notification, that checkpoint's transaction is committed (full commit)

That model assumes that transactions can be committed once the checkpoint's completeness notification is received (step 3). If a transaction times out between step (2) and step (3), there will be data loss.

This is an inherent fragility that seems hard to circumvent with the current primitive, and has to do with transaction timeouts. Given sufficiently long timeouts, this may never be a problem in most setups, but it is not nice to have this weak point in the long run.

Problem location

The problem is in the use of Pravega Transactions in the io.pravega.connectors.flink.FlinkExactlyOncePravegaWriter.

Suggestions for an improvement

From the top of my head, there are three types of solutions to that issue:

  1. Pravega offers a pre-commit / full-commit distinction in transactions, where a pre-commit means the transaction becomes immutable and the usual timeouts do not apply (except possibly a garbage prevention-timeout which could be very long). The full commit publishes the temporary segment.

  2. Flink makes sure it can recover transactions, for example by persisting the data as well in the checkpoints. If a transaction timed out, it can open a new transaction and re-write the data. Disadvantage is that effectively, everything is persisted in two distinct system (each with its own durability/replication).

  3. Flink adds a way that lets tasks complete a checkpoint completely independent of other tasks. Then the transaction could be immediately committed on trigger checkpoint. That would require to guarantee that the sinks would never be affected by a recovery of other tasks. To keep the current consistency guarantees, this would require persisting the result from the input to the sink operation (similar as for example Samza persists every shuffle), which is in some sense not too different from approach (2).

@StephanEwen StephanEwen changed the title Handle expiration of transactions between operator checkpoint and checkpoint commit Handle expiration of transactions between operator checkpoint and global checkpoint commit May 4, 2017
@tkaitchuck
Copy link
Member

tkaitchuck commented May 22, 2017

I addition to all of the above I can imagine several possible solutions:

  • Having all the writers use the same transaction. Some master process could create one and send the IDs to the writers. Then once all the writers flush and vote to commit, the checkpoint can be committed by the master process.
    • If this transactionality needs to be interlocked with another file, it could ping the transaction, record in the file 'going to commit' and then commit the TXN. Then update the file to say it committed. If it dies while in the 'going to commit' phase the replacement process can query the transaction to see if it committed or not, and can the commit it if it was not already.
  • If for some reason a single transaction does not work, it might be possible for us to come up with a scheme whereby many transactions can be created but where their commit or aborts will be all or nothing. (I'd need to understand the requirements better to flesh this out)
  • Setting ludicrously long timeouts. The bound for a timeout is under application control, so it could be set to Long.MAX_VALUE. The risk here is that if a transaction were left open and forgotten it would need to be manually cleaned up, otherwise the space would be lost, and some operations like sealing the stream or scaling couldn't be performed.
    • As mentioned above having a special 'pre-commit state' which is the only time the long timeout applies would reduce the probability of this impacting things poorly.
  • We could something in between the partial commit and the double write solutions mentioned above. Where a transaction is configured to, upon timeout, not be deleted, but simply become uncommittable. But the data written to it could be read back. (As though it were it's own steam) This resolves the transaction from the perspective of the main stream, it will never be committed, but the data does not go away. Transactions could be conceivably be configured to have one timeout before going into this uncommittable state, and another before being deleted. This would allow for the system to be configure to assume the transaction will go in within a few minutes, but if that is wrong, in the worst case it has to read and rewrite the data. However in the happy case this is not required.
  • We might be able to lift the timeout requirement on transactions, but it would take some significant effort. Part of the reason for the timeout, is to clean up stale data that is unwanted. So we would need to add an API to list the transaction and information about them so that applications could do this work manually. We would probably also want the transaction ID to be application provided so the app has an opportunity to store the ID before it is created, to avoid the case where it dies right after its creation and leaves a stale empty transaction open. Additionally there would quite a bit of work to maintain ordering guarantees in the presence of scaling events while there are open long-running transactions.

@vijikarthi
Copy link
Contributor

Pravega offers a pre-commit / full-commit distinction in transactions, where a pre-commit means the transaction becomes immutable and the usual timeouts do not apply (except possibly a garbage prevention-timeout which could be very long). The full commit publishes the temporary segment.

For applications like Flink that guarantees the triggering of txn.commit() once the Pre-Commit phase is successfully complete, I think providing an API option to ignore the transaction timeout could be one of the option to consider to mitigate the issue.

For example, during the pre-commit phase, the writer client could invoke txn.flush(boolean ignoreTimeOut) indicating that txn.commit() will eventually be called and hence the transaction timeout interval can be ignored (or defaults to a much larger value just in-case if the txn needs to be aborted later). There is a also a possibility of the transaction being timed-out while the txn.flush() is being called and in that case, the flush should not happen allowing Flink to restart the job according to the restart strategy option configured in the job.

Going by the current defaults, the default transaction timeout will be 10 Seconds times the multiplier 1000 i.e., 10 seconds * 1000 = 10000 seconds (~2.7 Hours) or a max of 24 Hours (if custom configuration value exceeds the default minimum) according to this calculation. So it is likely that we may not hit the timeout issue but still a possibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants