Skip to content

Commit

Permalink
fix bug error will be throw when date col is not the first col in has…
Browse files Browse the repository at this point in the history
…h partition table ref #2537 (#2554) (#2556)
  • Loading branch information
ti-chi-bot authored Sep 23, 2022
1 parent da155a8 commit 3b7d090
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ class PartitionWriteSuite extends BaseTiSparkTest {
tidbStmt.execute(s"ADMIN CHECK TABLE `$database`.`$table`")
}

test("test Year() partition when date type is not the first column") {
test("test Year() range partition when date type is not the first column") {
tidbStmt.execute(
s"create table `$database`.`$table` (name varchar(16), admission_date date, birthday date primary key ) partition by range(YEAR(birthday)) (" +
s"partition p0 values less than (1995)," +
Expand Down Expand Up @@ -706,6 +706,117 @@ class PartitionWriteSuite extends BaseTiSparkTest {
tidbStmt.execute(s"ADMIN CHECK TABLE `$database`.`$table`")
}

test("test Year() hash partition when date type is not the first column") {
tidbStmt.execute(
s"create table `$database`.`$table` (name varchar(16), admission_date date , birthday date primary key,graduation date ) partition by hash(YEAR(birthday)) PARTITIONS 4 ")
val data: RDD[Row] = sc.makeRDD(
List(
Row(
"Luo",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-06-15"),
Date.valueOf("2008-06-15")),
Row(
"John",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-08-08"),
Date.valueOf("2009-06-15")),
Row(
"Jack",
Date.valueOf("2002-09-01"),
Date.valueOf("1993-08-22"),
Date.valueOf("2010-06-15")),
Row(
"Mike",
Date.valueOf("2002-09-01"),
Date.valueOf("1999-06-04"),
Date.valueOf("2011-06-15"))))
val schema: StructType =
StructType(
List(
StructField("name", StringType),
StructField("admission_date", DateType),
StructField("birthday", DateType),
StructField("graduation", DateType)))
val df = sqlContext.createDataFrame(data, schema)
df.write
.format("tidb")
.options(tidbOptions)
.option("database", database)
.option("table", table)
.mode("append")
.save()
val insertResultSpark = spark.sql(s"select * from `tidb_catalog`.`$database`.`$table`")
insertResultSpark.collect() should contain theSameElementsAs Array(
Row(
"Luo",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-06-15"),
Date.valueOf("2008-06-15")),
Row(
"John",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-08-08"),
Date.valueOf("2009-06-15")),
Row(
"Jack",
Date.valueOf("2002-09-01"),
Date.valueOf("1993-08-22"),
Date.valueOf("2010-06-15")),
Row(
"Mike",
Date.valueOf("2002-09-01"),
Date.valueOf("1999-06-04"),
Date.valueOf("2011-06-15")))
checkPartitionJDBCResult(
Map(
"p0" -> Array(),
"p1" -> Array(
Array(
"Jack",
Date.valueOf("2002-09-01"),
Date.valueOf("1993-08-22"),
Date.valueOf("2010-06-15"))),
"p2" -> Array(),
"p3" -> Array(
Array(
"Luo",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-06-15"),
Date.valueOf("2008-06-15")),
Array(
"John",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-08-08"),
Date.valueOf("2009-06-15")),
Array(
"Mike",
Date.valueOf("2002-09-01"),
Date.valueOf("1999-06-04"),
Date.valueOf("2011-06-15")))))
spark.sql(
s"delete from `tidb_catalog`.`$database`.`$table` where birthday <= '1995-06-15' or name = 'Mike'")
val deleteResultSpark = spark.sql(s"select * from `tidb_catalog`.`$database`.`$table`")
deleteResultSpark.collect() should contain theSameElementsAs Array(
Row(
"John",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-08-08"),
Date.valueOf("2009-06-15")))
checkPartitionJDBCResult(
Map(
"p0" -> Array(),
"p1" -> Array(),
"p2" -> Array(),
"p3" -> Array(
Array(
"John",
Date.valueOf("2002-09-01"),
Date.valueOf("1995-08-08"),
Date.valueOf("2009-06-15")))))
tidbStmt.execute(s"ADMIN CHECK TABLE `$database`.`$table`")
}

test("unsupported function UNIX_TIMESTAMP() and range partition replace and delete test") {
tidbStmt.execute(
s"create table `$database`.`$table` (birthday timestamp primary key , name varchar(16)) partition by range(UNIX_TIMESTAMP(birthday)) (" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,14 @@ private TableCommon locateHashPartition(Row row) {
// TODO: support more function partition
FuncCallExpr partitionFuncExpr = (FuncCallExpr) originalExpr;
if (partitionFuncExpr.getFuncTp() == YEAR) {
Expression expression = partitionFuncExpr.getChildren().get(0);
ColumnRef columnRef = (ColumnRef) expression;
columnRef.resolve(logicalTable.getTableInfo());
int result =
(int) partitionFuncExpr.eval(Constant.create(row.getDate(0), DateType.DATE)).getValue();
(int)
partitionFuncExpr
.eval(Constant.create(row.get(columnRef.getColumnOffset(), DateType.DATE)))
.getValue();
int partitionId = result % physicalTables.length;
return physicalTables[partitionId];
} else {
Expand Down

0 comments on commit 3b7d090

Please sign in to comment.