Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -64,6 +65,7 @@
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
Expand All @@ -73,6 +75,7 @@
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.LoadPlan.RangeType;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.constraints.Constraint;
Expand Down Expand Up @@ -715,6 +718,67 @@ public void testExceptionInMetadataUpdate() throws Exception {
}
}

@Test
public void testManyTabletAndFiles() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String dir = getDir("/testBulkFile-");
FileSystem fs = getCluster().getFileSystem();
fs.mkdirs(new Path(dir));

TreeSet<Text> splits = IntStream.range(1, 9000).mapToObj(BulkNewIT::row).map(Text::new)
.collect(Collectors.toCollection(TreeSet::new));
c.tableOperations().addSplits(tableName, splits);

final int numTasks = 16;
var executor = Executors.newFixedThreadPool(numTasks);
var futures = new ArrayList<Future<?>>();
// wait for a portion of the tasks to be ready
CountDownLatch startLatch = new CountDownLatch(numTasks);
assertTrue(numTasks >= startLatch.getCount(),
"Not enough tasks/threads to satisfy latch count - deadlock risk");

var loadPlanBuilder = LoadPlan.builder();
var rowsExpected = new HashSet<>();
var imports = IntStream.range(2, 8999).boxed().collect(Collectors.toList());
// The order in which imports are added to the load plan should not matter so test that.
Collections.shuffle(imports);
for (var data : imports) {
String filename = "f" + data + ".";
loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, RangeType.TABLE, row(data - 1),
row(data));
var future = executor.submit(() -> {
startLatch.countDown();
startLatch.await();
writeData(fs, dir + "/" + filename, aconf, data, data);
return null;
});
futures.add(future);
rowsExpected.add(row(data));
}
assertEquals(imports.size(), futures.size());

for (var future : futures) {
future.get();
}

executor.shutdown();

var loadPlan = loadPlanBuilder.build();

c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();

// using a batch scanner can read from lots of tablets w/ less RPCs
try (var scanner = c.createBatchScanner(tableName)) {
// use a scan server so that tablets do not need to be hosted
scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
scanner.setRanges(List.of(new Range()));
var rowsSeen = scanner.stream().map(e -> e.getKey().getRowData().toString())
.collect(Collectors.toSet());
assertEquals(rowsExpected, rowsSeen);
}
}
}

@Test
public void testManyTablets() throws Exception {

Expand Down