Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ScanServerRefFile format to sort on UUID #4691

Merged
merged 7 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,70 @@
import java.util.Objects;
import java.util.UUID;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

public class ScanServerRefTabletFile extends ReferencedTabletFile {

private final Value NULL_VALUE = new Value(new byte[0]);
private final Text colf;
private final Text colq;
private final Text serverAddress;
private final Text uuid;

public ScanServerRefTabletFile(String file, String serverAddress, UUID serverLockUUID) {
public ScanServerRefTabletFile(UUID serverLockUUID, String serverAddress, String file) {
super(new Path(URI.create(file)));
this.colf = new Text(serverAddress);
this.colq = new Text(serverLockUUID.toString());
this.serverAddress = new Text(serverAddress);
uuid = new Text(serverLockUUID.toString());
}

public ScanServerRefTabletFile(String file, Text colf, Text colq) {
public ScanServerRefTabletFile(String file, String serverAddress, UUID serverLockUUID) {
super(new Path(URI.create(file)));
this.colf = colf;
this.colq = colq;
this.serverAddress = new Text(serverAddress);
this.uuid = new Text(serverLockUUID.toString());
}

public String getRow() {
return this.getNormalizedPathStr();
public ScanServerRefTabletFile(Key k) {
super(new Path(URI.create(k.getColumnQualifier().toString())));
serverAddress = k.getColumnFamily();
uuid = k.getRow();
}

public Text getServerAddress() {
return this.colf;
public Mutation putMutation() {
Mutation mutation = new Mutation(uuid.toString());
mutation.put(serverAddress, getFilePath(), getValue());
return mutation;
}

public Mutation putDeleteMutation() {
Mutation mutation = new Mutation(uuid.toString());
mutation.putDelete(serverAddress, getFilePath());
return mutation;
}

public Text getServerLockUUID() {
return this.colq;
public Text getFilePath() {
return new Text(this.getNormalizedPathStr());
}

public UUID getServerLockUUID() {
return UUID.fromString(uuid.toString());
}

public Value getValue() {
return NULL_VALUE;
}

public Text getServerAddress() {
return serverAddress;
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((colf == null) ? 0 : colf.hashCode());
result = prime * result + ((colq == null) ? 0 : colq.hashCode());
result = prime * result + ((serverAddress == null) ? 0 : serverAddress.hashCode());
result = prime * result + ((uuid == null) ? 0 : uuid.hashCode());
return result;
}

Expand All @@ -81,13 +101,13 @@ public boolean equals(Object obj) {
return false;
}
ScanServerRefTabletFile other = (ScanServerRefTabletFile) obj;
return Objects.equals(colf, other.colf) && Objects.equals(colq, other.colq);
return Objects.equals(serverAddress, other.serverAddress) && Objects.equals(uuid, other.uuid);
}

@Override
public String toString() {
return "ScanServerRefTabletFile [file=" + this.getRow() + ", server address=" + colf
+ ", server lock uuid=" + colq + "]";
return "ScanServerRefTabletFile [file=" + this.getNormalizedPathStr() + ", server address="
+ serverAddress + ", server lock uuid=" + uuid + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.metadata.AccumuloTable;
Expand All @@ -62,81 +60,109 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FileSystemInitializer {
public class FileSystemInitializer {
private static final String TABLE_TABLETS_TABLET_DIR = "table_info";
private static final Logger log = LoggerFactory.getLogger(FileSystemInitializer.class);
private static final Text SPLIT_POINT =
MetadataSchema.TabletsSection.getRange().getEndKey().getRow();

// config only for root table
private final InitialConfiguration initConfig;

FileSystemInitializer(InitialConfiguration initConfig, ZooReaderWriter zoo, InstanceId uuid) {
public FileSystemInitializer(InitialConfiguration initConfig) {
this.initConfig = initConfig;
}

private static class Tablet {
public static class InitialTablet {
TableId tableId;
String dirName;
Text prevEndRow, endRow;
Text prevEndRow, endRow, extent;
String[] files;

Tablet(TableId tableId, String dirName, Text prevEndRow, Text endRow, String... files) {
InitialTablet(TableId tableId, String dirName, Text prevEndRow, Text endRow, String... files) {
this.tableId = tableId;
this.dirName = dirName;
this.prevEndRow = prevEndRow;
this.endRow = endRow;
this.files = files;
this.extent = new Text(MetadataSchema.TabletsSection.encodeRow(this.tableId, this.endRow));
}

private TreeMap<Key,Value> createEntries() {
TreeMap<Key,Value> sorted = new TreeMap<>();
Value EMPTY_SIZE = new DataFileValue(0, 0).encodeAsValue();
sorted.put(new Key(this.extent, DIRECTORY_COLUMN.getColumnFamily(),
DIRECTORY_COLUMN.getColumnQualifier(), 0), new Value(this.dirName));
sorted.put(
new Key(this.extent, TIME_COLUMN.getColumnFamily(), TIME_COLUMN.getColumnQualifier(), 0),
new Value(new MetadataTime(0, TimeType.LOGICAL).encode()));
sorted.put(
new Key(this.extent, PREV_ROW_COLUMN.getColumnFamily(),
PREV_ROW_COLUMN.getColumnQualifier(), 0),
MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow(this.prevEndRow));
for (String file : this.files) {
var col =
new ColumnFQ(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME, new Text(file));
sorted.put(new Key(extent, col.getColumnFamily(), col.getColumnQualifier(), 0), EMPTY_SIZE);
}
return sorted;
}

public Mutation createMutation() {
Mutation mutation = new Mutation(this.extent);
for (Map.Entry<Key,Value> entry : createEntries().entrySet()) {
mutation.put(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(),
entry.getValue());
}
return mutation;
}

}

void initialize(VolumeManager fs, String rootTabletDirUri, String rootTabletFileUri,
ServerContext context) throws IOException, InterruptedException, KeeperException {
SiteConfiguration siteConfig = initConfig.getSiteConf();
// initialize initial system tables config in zookeeper
initSystemTablesConfig(context);

Text splitPoint = MetadataSchema.TabletsSection.getRange().getEndKey().getRow();

VolumeChooserEnvironment chooserEnv = new VolumeChooserEnvironmentImpl(
VolumeChooserEnvironment.Scope.INIT, AccumuloTable.METADATA.tableId(), splitPoint, context);
String tableMetadataTabletDirName = TABLE_TABLETS_TABLET_DIR;
String tableMetadataTabletDirUri =
VolumeChooserEnvironment.Scope.INIT, AccumuloTable.METADATA.tableId(), null, context);
String defaultMetadataTabletDirName =
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
String defaultMetadataTabletDirUri =
fs.choose(chooserEnv, context.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ AccumuloTable.METADATA.tableId() + Path.SEPARATOR + tableMetadataTabletDirName;
+ AccumuloTable.METADATA.tableId() + Path.SEPARATOR + defaultMetadataTabletDirName;

chooserEnv = new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT,
AccumuloTable.SCAN_REF.tableId(), null, context);
String scanRefTableDefaultTabletDirName =
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
String scanRefTableDefaultTabletDirUri =
AccumuloTable.METADATA.tableId(), SPLIT_POINT, context);

String tableMetadataTabletDirUri =
fs.choose(chooserEnv, context.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ AccumuloTable.SCAN_REF.tableId() + Path.SEPARATOR + scanRefTableDefaultTabletDirName;
+ AccumuloTable.METADATA.tableId() + Path.SEPARATOR + TABLE_TABLETS_TABLET_DIR;

chooserEnv = new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT,
AccumuloTable.METADATA.tableId(), null, context);
String defaultMetadataTabletDirName =
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
String defaultMetadataTabletDirUri =
fs.choose(chooserEnv, context.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ AccumuloTable.METADATA.tableId() + Path.SEPARATOR + defaultMetadataTabletDirName;
AccumuloTable.SCAN_REF.tableId(), null, context);

String scanRefTableDefaultTabletDirUri = fs.choose(chooserEnv, context.getBaseUris())
+ Constants.HDFS_TABLES_DIR + Path.SEPARATOR + AccumuloTable.SCAN_REF.tableId()
+ Path.SEPARATOR + MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;

// create table and default tablets directories
createDirectories(fs, rootTabletDirUri, tableMetadataTabletDirUri, defaultMetadataTabletDirUri,
createDirectories(fs, rootTabletDirUri, defaultMetadataTabletDirUri, tableMetadataTabletDirUri,
scanRefTableDefaultTabletDirUri);

String ext = FileOperations.getNewFileExtension(DefaultConfiguration.getInstance());
InitialTablet scanRefTablet = createScanRefTablet(context);

// populate the metadata tablet with info about scan ref tablets
String ext = FileOperations.getNewFileExtension(DefaultConfiguration.getInstance());
String metadataFileName = tableMetadataTabletDirUri + Path.SEPARATOR + "0_1." + ext;
Tablet scanRefTablet =
new Tablet(AccumuloTable.SCAN_REF.tableId(), scanRefTableDefaultTabletDirName, null, null);
createMetadataFile(fs, metadataFileName, siteConfig, scanRefTablet);
createMetadataFile(fs, metadataFileName, scanRefTablet);

// populate the root tablet with info about the metadata table's two initial tablets
Tablet tablesTablet = new Tablet(AccumuloTable.METADATA.tableId(), tableMetadataTabletDirName,
null, splitPoint, StoredTabletFile.of(new Path(metadataFileName)).getMetadata());
Tablet defaultTablet = new Tablet(AccumuloTable.METADATA.tableId(),
defaultMetadataTabletDirName, splitPoint, null);
createMetadataFile(fs, rootTabletFileUri, siteConfig, tablesTablet, defaultTablet);
InitialTablet tablesTablet =
new InitialTablet(AccumuloTable.METADATA.tableId(), TABLE_TABLETS_TABLET_DIR, null,
SPLIT_POINT, StoredTabletFile.of(new Path(metadataFileName)).getMetadata());
InitialTablet defaultTablet = new InitialTablet(AccumuloTable.METADATA.tableId(),
defaultMetadataTabletDirName, SPLIT_POINT, null);
createMetadataFile(fs, rootTabletFileUri, tablesTablet, defaultTablet);
}

private void createDirectories(VolumeManager fs, String... dirs) throws IOException {
Expand Down Expand Up @@ -164,7 +190,6 @@ private void initSystemTablesConfig(final ServerContext context)
setTableProperties(context, AccumuloTable.ROOT.tableId(), initConfig.getRootMetaConf());
setTableProperties(context, AccumuloTable.METADATA.tableId(), initConfig.getRootMetaConf());
setTableProperties(context, AccumuloTable.METADATA.tableId(), initConfig.getMetaTableConf());
setTableProperties(context, AccumuloTable.SCAN_REF.tableId(), initConfig.getScanRefTableConf());
}

private void setTableProperties(final ServerContext context, TableId tableId,
Expand All @@ -179,12 +204,8 @@ private void setTableProperties(final ServerContext context, TableId tableId,
}

private void createMetadataFile(VolumeManager volmanager, String fileName,
AccumuloConfiguration conf, Tablet... tablets) throws IOException {
// sort file contents in memory, then play back to the file
TreeMap<Key,Value> sorted = new TreeMap<>();
for (Tablet tablet : tablets) {
createEntriesForTablet(sorted, tablet);
}
InitialTablet... initialTablets) throws IOException {
AccumuloConfiguration conf = initConfig.getSiteConf();
ReferencedTabletFile file = ReferencedTabletFile.of(new Path(fileName));
FileSystem fs = volmanager.getFileSystemByPath(file.getPath());

Expand All @@ -194,28 +215,22 @@ private void createMetadataFile(VolumeManager volmanager, String fileName,
.forFile(file, fs, fs.getConf(), cs).withTableConfiguration(conf).build();
tabletWriter.startDefaultLocalityGroup();

TreeMap<Key,Value> sorted = new TreeMap<>();
for (InitialTablet initialTablet : initialTablets) {
// sort file contents in memory, then play back to the file
sorted.putAll(initialTablet.createEntries());
}

for (Map.Entry<Key,Value> entry : sorted.entrySet()) {
tabletWriter.append(entry.getKey(), entry.getValue());
}

tabletWriter.close();
}

private void createEntriesForTablet(TreeMap<Key,Value> map, Tablet tablet) {
Value EMPTY_SIZE = new DataFileValue(0, 0).encodeAsValue();
Text extent = new Text(MetadataSchema.TabletsSection.encodeRow(tablet.tableId, tablet.endRow));
addEntry(map, extent, DIRECTORY_COLUMN, new Value(tablet.dirName));
addEntry(map, extent, TIME_COLUMN, new Value(new MetadataTime(0, TimeType.LOGICAL).encode()));
addEntry(map, extent, PREV_ROW_COLUMN,
MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow(tablet.prevEndRow));
for (String file : tablet.files) {
addEntry(map, extent,
new ColumnFQ(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME, new Text(file)),
EMPTY_SIZE);
}
}
public InitialTablet createScanRefTablet(ServerContext context) throws IOException {
setTableProperties(context, AccumuloTable.SCAN_REF.tableId(), initConfig.getScanRefTableConf());

private void addEntry(TreeMap<Key,Value> map, Text row, ColumnFQ col, Value value) {
map.put(new Key(row, col.getColumnFamily(), col.getColumnQualifier(), 0), value);
return new InitialTablet(AccumuloTable.SCAN_REF.tableId(),
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
import org.apache.hadoop.conf.Configuration;

class InitialConfiguration {
public class InitialConfiguration {

// config only for root table
private final HashMap<String,String> initialRootConf = new HashMap<>();
Expand All @@ -47,7 +47,7 @@ class InitialConfiguration {
private final Configuration hadoopConf;
private final SiteConfiguration siteConf;

InitialConfiguration(Configuration hadoopConf, SiteConfiguration siteConf) {
public InitialConfiguration(Configuration hadoopConf, SiteConfiguration siteConf) {
this.hadoopConf = hadoopConf;
this.siteConf = siteConf;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private boolean doInit(ZooReaderWriter zoo, Opts opts, VolumeManager fs,
if (!createDirs(fs, instanceId, initConfig.getVolumeUris())) {
throw new IOException("Problem creating directories on " + fs.getVolumes());
}
var fileSystemInitializer = new FileSystemInitializer(initConfig, zoo, instanceId);
var fileSystemInitializer = new FileSystemInitializer(initConfig);
var rootVol = fs.choose(chooserEnv, initConfig.getVolumeUris());
var rootPath = new Path(rootVol + SEPARATOR + TABLE_DIR + SEPARATOR
+ AccumuloTable.ROOT.tableId() + SEPARATOR + rootTabletDirName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ void initialize(final ServerContext context, final boolean clearInstanceName,
TableManager.prepareNewNamespaceState(context, Namespace.ACCUMULO.id(),
Namespace.ACCUMULO.name(), ZooUtil.NodeExistsPolicy.FAIL);

for (AccumuloTable table : AccumuloTable.values()) {
TableManager.prepareNewTableState(context, table.tableId(), Namespace.ACCUMULO.id(),
table.tableName(), TableState.ONLINE, ZooUtil.NodeExistsPolicy.FAIL);
}
TableManager.prepareNewTableState(context, AccumuloTable.ROOT.tableId(),
Namespace.ACCUMULO.id(), AccumuloTable.ROOT.tableName(), TableState.ONLINE,
ZooUtil.NodeExistsPolicy.FAIL);
TableManager.prepareNewTableState(context, AccumuloTable.METADATA.tableId(),
Namespace.ACCUMULO.id(), AccumuloTable.METADATA.tableName(), TableState.ONLINE,
ZooUtil.NodeExistsPolicy.FAIL);
// Call this separately so the upgrader code can handle the zk node creation for scan refs
initScanRefTableState(context);

zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
Expand Down Expand Up @@ -188,4 +192,14 @@ public static byte[] getInitialRootTabletJson(String dirName, String file) {
return rootTabletJson.toJson().getBytes(UTF_8);
}

public void initScanRefTableState(ServerContext context) {
try {
TableManager.prepareNewTableState(context, AccumuloTable.SCAN_REF.tableId(),
Namespace.ACCUMULO.id(), AccumuloTable.SCAN_REF.tableName(), TableState.ONLINE,
ZooUtil.NodeExistsPolicy.FAIL);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
}

}
Loading