Skip to content

Commit

Permalink
WIP; tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Bouncheck committed Nov 26, 2024
1 parent 1cfc2df commit 925a498
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 0 deletions.
11 changes: 11 additions & 0 deletions driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ public class Metadata {
Metadata(Cluster.Manager cluster) {
this.cluster = cluster;
this.tabletMap = TabletMap.emptyMap(cluster);
if (cluster.configuration.getQueryOptions().isMetadataEnabled()) {
cluster
.getCluster()
.register(
new TabletMapListener(tabletMap) {
@Override
public void onRegister(Cluster cluster) {
System.out.println("Registered listener for tablet map");
}
});
}
}

// rebuilds the token map with the current hosts, typically when refreshing schema metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ public TypeCodec<TupleValue> getTabletPayloadCodec() {
return tabletPayloadCodec;
}

public void removeTableMappings(KeyspaceTableNamePair key) {
this.mapping.remove(key);
}

public void removeTableMappings(String keyspace, String table) {
removeTableMappings(new KeyspaceTableNamePair(keyspace, table));
}

/**
* Simple class to hold UUID of a host and a shard number. Class itself makes no checks or
* guarantees about existence of a shard on corresponding host.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.datastax.driver.core;

public class TabletMapListener extends SchemaChangeListenerBase {
private final TabletMap tabletMap;

public TabletMapListener(TabletMap tabletMap) {
this.tabletMap = tabletMap;
}

@Override
public void onTableChanged(TableMetadata current, TableMetadata previous) {
tabletMap.removeTableMappings(previous.getKeyspace().getName(), previous.getName());
}

@Override
public void onTableRemoved(TableMetadata table) {
tabletMap.removeTableMappings(table.getKeyspace().getName(), table.getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package com.datastax.driver.core;

import static com.datastax.driver.core.Assertions.assertThat;
import static com.datastax.driver.core.Metadata.handleId;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.utils.ScyllaVersion;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@CCMConfig(
createCluster = false,
numberOfNodes = 2,
jvmArgs = {
"--experimental-features=consistent-topology-changes",
"--experimental-features=tablets"
})
@ScyllaVersion(minOSS = "6.0.0", minEnterprise = "2024.2", description = "Needs to support tablets")
public class TabletMapListenerTest extends CCMTestsSupport {
private static final int INITIAL_TABLETS = 32;
private static final String KEYSPACE_NAME = "listenerTest";
private static final String TABLE_NAME = "testTable";
private static final String CREATE_TABLETS_KEYSPACE_QUERY =
"CREATE KEYSPACE "
+ KEYSPACE_NAME
+ " WITH replication = {'class': "
+ "'NetworkTopologyStrategy', "
+ "'replication_factor': '1'} AND durable_writes = true AND tablets = "
+ "{'initial': "
+ INITIAL_TABLETS
+ "};";

private static final String CREATE_KEYSPACE = CREATE_TABLETS_KEYSPACE_QUERY;
private static final String ALTER_KEYSPACE =
"ALTER KEYSPACE " + KEYSPACE_NAME + " WITH durable_writes = false";
private static final String DROP_KEYSPACE = "DROP KEYSPACE " + KEYSPACE_NAME;

private static final String CREATE_TABLE =
"CREATE TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + "(i int primary key)";
private static final String INSERT_QUERY_TEMPLATE =
"INSERT INTO " + KEYSPACE_NAME + "." + TABLE_NAME + " (i) VALUES (%s)";
private static final String INSERT_ALTERED_TEMPLATE =
"INSERT INTO " + KEYSPACE_NAME + "." + TABLE_NAME + " (i,j) VALUES (%s,%s)";
private static final String SELECT_PK_WHERE =
"SELECT i FROM " + KEYSPACE_NAME + "." + TABLE_NAME + " WHERE i = ?";
private static final String ALTER_TABLE =
"ALTER TABLE " + KEYSPACE_NAME + "." + TABLE_NAME + " ADD j int";
private static final String DROP_TABLE = "DROP TABLE " + KEYSPACE_NAME + "." + TABLE_NAME;

/** The maximum time that the test will wait to check that listeners have been notified. */
private static final long NOTIF_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);

Cluster cluster;
Session session;
SchemaChangeListener listener;
List<SchemaChangeListener> listeners;

@BeforeClass(groups = "short")
public void setup() throws InterruptedException {
cluster =
createClusterBuilderNoDebouncing().withLoadBalancingPolicy(new RoundRobinPolicy()).build();

session = cluster.connect();

cluster.register(listener = mock(TabletMapListener.class));
listeners = Lists.newArrayList(listener);
}

// Checks for tablet removal both on table update and removal
@Test(groups = "short")
public void should_remove_tablets_on_table_alterations() throws InterruptedException {

session.execute(CREATE_KEYSPACE);
ArgumentCaptor<KeyspaceMetadata> added = null;
for (SchemaChangeListener listener : listeners) {
added = ArgumentCaptor.forClass(KeyspaceMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceAdded(added.capture());
assertThat(added.getValue()).hasName(handleId(KEYSPACE_NAME));
}
assert added != null;

TabletMap tabletMap;
tabletMap = cluster.getMetadata().getTabletMap();

session.execute(CREATE_TABLE);
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(String.format(INSERT_QUERY_TEMPLATE, "42"));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
assertThat(tabletMap.getMapping())
.containsKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(ALTER_TABLE);
for (SchemaChangeListener listener : listeners) {
ArgumentCaptor<TableMetadata> current = ArgumentCaptor.forClass(TableMetadata.class);
ArgumentCaptor<TableMetadata> previous = ArgumentCaptor.forClass(TableMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1))
.onTableChanged(current.capture(), previous.capture());
assertThat(previous.getValue().getKeyspace()).hasName(handleId(KEYSPACE_NAME));
assertThat(previous.getValue()).hasName(handleId(TABLE_NAME));
}
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(String.format(INSERT_ALTERED_TEMPLATE, "42", "42"));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
assertThat(tabletMap.getMapping())
.containsKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(DROP_TABLE);
ArgumentCaptor<TableMetadata> removed = null;
for (SchemaChangeListener listener : listeners) {
removed = ArgumentCaptor.forClass(TableMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onTableRemoved(removed.capture());
assertThat(removed.getValue().getKeyspace()).hasName(handleId(KEYSPACE_NAME));
assertThat(removed.getValue()).hasName(handleId(TABLE_NAME));
}
assert removed != null;
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

session.execute(DROP_KEYSPACE);
}

@Test(groups = "short")
public void should_remove_tablets_on_keyspace_alterations() throws InterruptedException {
session.execute(CREATE_KEYSPACE);
ArgumentCaptor<KeyspaceMetadata> added = null;
for (SchemaChangeListener listener : listeners) {
added = ArgumentCaptor.forClass(KeyspaceMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1)).onKeyspaceAdded(added.capture());
assertThat(added.getValue()).hasName(handleId(KEYSPACE_NAME));
}
assert added != null;

TabletMap tabletMap;
tabletMap = cluster.getMetadata().getTabletMap();

session.execute(CREATE_TABLE);
session.execute(String.format(INSERT_QUERY_TEMPLATE, "42"));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
session.execute(session.prepare(SELECT_PK_WHERE).bind(42));
assertThat(tabletMap.getMapping())
.containsKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));

assertThat(cluster.getMetadata().getKeyspace(KEYSPACE_NAME).isDurableWrites()).isTrue();

session.execute(ALTER_KEYSPACE);
for (SchemaChangeListener listener : listeners) {
ArgumentCaptor<TableMetadata> current = ArgumentCaptor.forClass(TableMetadata.class);
ArgumentCaptor<TableMetadata> previous = ArgumentCaptor.forClass(TableMetadata.class);
verify(listener, timeout(NOTIF_TIMEOUT_MS).times(1))
.onTableChanged(current.capture(), previous.capture());
assertThat(previous.getValue().getKeyspace()).hasName(handleId(KEYSPACE_NAME));
assertThat(previous.getValue()).hasName(handleId(TABLE_NAME));
}
assertThat(cluster.getMetadata().getKeyspace(KEYSPACE_NAME)).isNotDurableWrites();
assertThat(tabletMap.getMapping())
.doesNotContainKey(
new TabletMap.KeyspaceTableNamePair(handleId(KEYSPACE_NAME), handleId(TABLE_NAME)));
}

@AfterClass(groups = "short", alwaysRun = true)
public void teardown() {
cluster.close();
}
}

0 comments on commit 925a498

Please sign in to comment.