Skip to content

Commit 353e52d

Browse files
authored
transaction: Handle commit_ts expired error (tikv#521)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent 02e3a8e commit 353e52d

4 files changed

Lines changed: 60 additions & 9 deletions

File tree

src/kv/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ pub use key::KvPairTTL;
1414
pub use kvpair::KvPair;
1515
pub use value::Value;
1616

17-
struct HexRepr<'a>(pub &'a [u8]);
17+
pub struct HexRepr<'a>(pub &'a [u8]);
1818

1919
impl fmt::Display for HexRepr<'_> {
2020
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {

src/store/errors.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::fmt::Display;
4-
53
use crate::proto::kvrpcpb;
64
use crate::Error;
75

@@ -162,11 +160,15 @@ impl HasKeyErrors for kvrpcpb::PessimisticRollbackResponse {
162160
}
163161
}
164162

165-
impl<T: HasKeyErrors, E: Display> HasKeyErrors for Result<T, E> {
163+
impl<T: HasKeyErrors> HasKeyErrors for Result<T, Error> {
166164
fn key_errors(&mut self) -> Option<Vec<Error>> {
167165
match self {
168166
Ok(x) => x.key_errors(),
169-
Err(e) => Some(vec![Error::StringError(e.to_string())]),
167+
Err(Error::MultipleKeyErrors(errs)) => Some(std::mem::take(errs)),
168+
Err(e) => Some(vec![std::mem::replace(
169+
e,
170+
Error::StringError("".to_string()), // placeholder, no use.
171+
)]),
170172
}
171173
}
172174
}

src/transaction/transaction.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use std::time::Instant;
99
use derive_new::new;
1010
use fail::fail_point;
1111
use futures::prelude::*;
12-
use log::warn;
13-
use log::{debug, trace};
12+
use log::{debug, error, info, trace, warn};
1413
use tokio::time::Duration;
1514

1615
use crate::backoff::Backoff;
1716
use crate::backoff::DEFAULT_REGION_BACKOFF;
17+
use crate::kv::HexRepr;
1818
use crate::pd::PdClient;
1919
use crate::pd::PdRpcClient;
2020
use crate::proto::kvrpcpb;
@@ -1293,7 +1293,7 @@ impl<PdC: PdClient> Committer<PdC> {
12931293
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
12941294
min_commit_ts.unwrap()
12951295
} else {
1296-
match self.commit_primary().await {
1296+
match self.commit_primary_with_retry().await {
12971297
Ok(commit_ts) => commit_ts,
12981298
Err(e) => {
12991299
return if self.undetermined {
@@ -1398,6 +1398,11 @@ impl<PdC: PdClient> Committer<PdC> {
13981398
.plan();
13991399
plan.execute()
14001400
.inspect_err(|e| {
1401+
debug!(
1402+
"commit primary error: {:?}, start_ts: {}",
1403+
e,
1404+
self.start_version.version()
1405+
);
14011406
// We don't know whether the transaction is committed or not if we fail to receive
14021407
// the response. Then, we mark the transaction as undetermined and propagate the
14031408
// error to the user.
@@ -1410,6 +1415,48 @@ impl<PdC: PdClient> Committer<PdC> {
14101415
Ok(commit_version)
14111416
}
14121417

1418+
async fn commit_primary_with_retry(&mut self) -> Result<Timestamp> {
1419+
loop {
1420+
match self.commit_primary().await {
1421+
Ok(commit_version) => return Ok(commit_version),
1422+
Err(Error::ExtractedErrors(mut errors)) => match errors.pop() {
1423+
Some(Error::KeyError(key_err)) => {
1424+
if let Some(expired) = key_err.commit_ts_expired {
1425+
// Ref: https://github.com/tikv/client-go/blob/tidb-8.5/txnkv/transaction/commit.go
1426+
info!("2PC commit_ts rejected by TiKV, retry with a newer commit_ts, start_ts: {}",
1427+
self.start_version.version());
1428+
1429+
let primary_key = self.primary_key.as_ref().unwrap();
1430+
if primary_key != expired.key.as_ref() {
1431+
error!("2PC commit_ts rejected by TiKV, but the key is not the primary key, start_ts: {}, key: {}, primary: {:?}",
1432+
self.start_version.version(), HexRepr(&expired.key), primary_key);
1433+
return Err(Error::StringError("2PC commitTS rejected by TiKV, but the key is not the primary key".to_string()));
1434+
}
1435+
1436+
// Do not retry for a txn which has a too large min_commit_ts.
1437+
// 3600000 << 18 = 943718400000
1438+
if expired
1439+
.min_commit_ts
1440+
.saturating_sub(expired.attempted_commit_ts)
1441+
> 943718400000
1442+
{
1443+
let msg = format!("2PC min_commit_ts is too large, we got min_commit_ts: {}, and attempted_commit_ts: {}",
1444+
expired.min_commit_ts, expired.attempted_commit_ts);
1445+
return Err(Error::StringError(msg));
1446+
}
1447+
continue;
1448+
} else {
1449+
return Err(Error::KeyError(key_err));
1450+
}
1451+
}
1452+
Some(err) => return Err(err),
1453+
None => unreachable!(),
1454+
},
1455+
Err(err) => return Err(err),
1456+
}
1457+
}
1458+
}
1459+
14131460
async fn commit_secondary(self, commit_version: Timestamp) -> Result<()> {
14141461
debug!("committing secondary");
14151462
let start_version = self.start_version.clone();

tests/common/ctl.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ pub async fn get_region_count() -> Result<u64> {
1616
.text()
1717
.await
1818
.map_err(|e| Error::StringError(e.to_string()))?;
19-
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap();
19+
let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| {
20+
panic!("invalid body: {:?}, error: {:?}", body, err);
21+
});
2022
value["count"]
2123
.as_u64()
2224
.ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned()))

0 commit comments

Comments
 (0)