Skip to content

Avoids reading all fate ids into memory.#4129

Merged
keith-turner merged 4 commits into
apache:elasticityfrom
keith-turner:fate-stream
Jan 5, 2024
Merged

Avoids reading all fate ids into memory.#4129
keith-turner merged 4 commits into
apache:elasticityfrom
keith-turner:fate-stream

Conversation

@keith-turner
Copy link
Copy Markdown
Contributor

As we change Accumulo to use FATE for per tablet operations its important to avoid reading all FATEs persisted data into memory. This commit modifies FATE to use Streams internally instead of Collections. For the Accumulo implemention of FATE storage this makes it possible to have java stream backed by a scanner which avoids reading all of the FATE ids into memory. The Zookeeper storage implementation will still read everything into memory.

Another change that was made in the PR was optimizing the Accumulo storage layer to read the status while reading the id. Before this change ids were read from scanner, then for each id a scanner was created to read the status. Now the status and id are read in stream from the same scanner which should be much faster. This change was not possible for Zookeeper, it will still make an RPC to get each status. Its ok that Zookeeper store is less efficient as the Accumulo store will likely store orders of magnitude more data. Its probably not possible to make the same optimizations for speed and memory in the zookeeper store.

A bug in the Fate integration test was fixed by using the Unknown status which represents the status for transaction that does not exists in the persisted store. Ran into this bug while testing these changes.

As we change Accumulo to use FATE for per tablet operations its
important to avoid reading all FATEs persisted data into memory. This
commit modifies FATE to use Streams internally instead of Collections.
For the Accumulo implemention of FATE storage this makes it possible to
have java stream backed by a scanner which avoids reading all of the
FATE ids into memory.  The Zookeeper storage implementation will still
read everything into memory.

Another change that was made in the PR was optimizing the Accumulo
storage layer to read the status while reading the id.  Before this
change ids were read from scanner, then for each id a scanner was
created to read the status. Now the status and id are read in stream
from the same scanner which should be much faster.  This change was not
possible for Zookeeper, it will still make an RPC to get each status.
Its ok that Zookeeper store is less efficient as the Accumulo store will
likely store orders of magnitude more data.  Its probably not possible
to make the same optimizations for speed and memory in the zookeeper
store.

A bug in the Fate integration test was fixed by using the Unknown status
which represents the status for transaction that does not exists in the
persisted store.  Ran into this bug while testing these changes.
@keith-turner keith-turner requested a review from cshannon January 4, 2024 23:30
Copy link
Copy Markdown
Contributor

@cshannon cshannon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, the changes here are nice, especially optimizing the status look up

Comment on lines +79 to +86
return scanner.stream().onClose(scanner::close).map(e -> {
return new FateIdStatus(parseTid(e.getKey().getRow().toString())) {
@Override
public TStatus getStatus() {
return TStatus.valueOf(e.getValue().toString());
}
};
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return scanner.stream().onClose(scanner::close).map(e -> {
return new FateIdStatus(parseTid(e.getKey().getRow().toString())) {
@Override
public TStatus getStatus() {
return TStatus.valueOf(e.getValue().toString());
}
};
});
return scanner.stream().onClose(scanner::close)
.map(e -> new FateIdStatus(parseTid(e.getKey().getRow().toString())) {
@Override
public TStatus getStatus() {
return TStatus.valueOf(e.getValue().toString());
}
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small simplification which you can ignore

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried removing the code block when writing and thought it was just too much going on on a single line. Decided to add the code block back so that the lambda declaration and anonymous class declaration were on separate lines.

assertEquals(1, callStarted.getCount());
fate.delete(txid);
assertThrows(getNoTxExistsException(), () -> getTxStatus(sctx, txid));
assertEquals(UNKNOWN, getTxStatus(sctx, txid));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad you fixed this issue, I made the comment about the race condition because I encountered the same bug and I forgot to make a follow on issue to fix the test because it kept failing for me on occasion too

@keith-turner keith-turner merged commit f46b09a into apache:elasticity Jan 5, 2024
@keith-turner keith-turner deleted the fate-stream branch January 5, 2024 14:04
@ctubbsii ctubbsii added this to the 4.0.0 milestone Jul 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

3 participants