diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java index 73261a7bd45..c7c075edd73 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java @@ -19,6 +19,7 @@ import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValueUtil; @@ -33,6 +34,7 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.ScanUtil; public class DistinctPrefixFilter extends FilterBase implements Writable { private static byte VERSION = 1; @@ -44,6 +46,8 @@ public class DistinctPrefixFilter extends FilterBase implements Writable { private int lastPosition; private final ImmutableBytesWritable lastKey = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY, -1, -1); + private byte[] emptyCF; + private byte[] emptyCQ; public DistinctPrefixFilter() { } @@ -51,12 +55,28 @@ public DistinctPrefixFilter() { public DistinctPrefixFilter(RowKeySchema schema, int prefixLength) { this.schema = schema; this.prefixLength = prefixLength; + this.emptyCF = null; + this.emptyCQ = null; + } + + public DistinctPrefixFilter(RowKeySchema schema, int prefixLength, byte[] emptyCF, + byte[] emptyCQ) { + this(schema, prefixLength); + this.emptyCF = emptyCF; + this.emptyCQ = emptyCQ; } public void setOffset(int offset) { this.offset = offset; } + // This is used when the DistinctPrefixFilter is present on a scan on an uncovered index + public void reinitialize() { + lastKey.set(ByteUtil.EMPTY_BYTE_ARRAY, -1, -1); + lastPosition = -1; + filterAll = false; + } + // No @Override for HBase 3 compatibility public ReturnCode filterKeyValue(Cell v) throws IOException { return filterCell(v); @@ -64,6 +84,10 @@ public ReturnCode filterKeyValue(Cell v) throws IOException { @Override public ReturnCode filterCell(Cell v) throws IOException { + if (emptyCF != null && emptyCQ != null && !ScanUtil.isEmptyColumn(v, emptyCF, emptyCQ)) { + // wait for the empty column + return ReturnCode.NEXT_COL; + } ImmutableBytesWritable ptr = new ImmutableBytesWritable(); // First determine the prefix based on the schema @@ -151,6 +175,12 @@ public void write(DataOutput out) throws IOException { out.writeByte(VERSION); schema.write(out); out.writeInt(prefixLength); + if (emptyCF != null && emptyCQ != null) { + out.writeInt(emptyCF.length); + out.write(emptyCF); + out.writeInt(emptyCQ.length); + out.write(emptyCQ); + } } @Override @@ -159,6 +189,18 @@ public void readFields(DataInput in) throws IOException { schema = new RowKeySchema(); schema.readFields(in); prefixLength = in.readInt(); + try { + int length = in.readInt(); + emptyCF = new byte[length]; + in.readFully(emptyCF, 0, length); + length = in.readInt(); + emptyCQ = new byte[length]; + in.readFully(emptyCQ, 0, length); + } catch (EOFException e) { + // Older client doesn't send empty column information + emptyCF = null; + emptyCQ = null; + } } @Override diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java index a5d78112c24..03e0168c3b9 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java @@ -39,7 +39,6 @@ public class EmptyColumnOnlyFilter extends FilterBase implements Writable { private byte[] emptyCQ; private boolean found = false; private boolean first = true; - private Cell emptyColumnCell = null; public EmptyColumnOnlyFilter() { } @@ -55,7 +54,6 @@ public EmptyColumnOnlyFilter(byte[] emptyCF, byte[] emptyCQ) { public void reset() throws IOException { found = false; first = true; - emptyColumnCell = null; } // No @Override for HBase 3 compatibility @@ -70,7 +68,6 @@ public ReturnCode filterCell(final Cell cell) throws IOException { } if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) { found = true; - emptyColumnCell = cell; return ReturnCode.INCLUDE; } if (first) { @@ -82,22 +79,8 @@ public ReturnCode filterCell(final Cell cell) throws IOException { @Override public void filterRowCells(List kvs) throws IOException { - if (kvs.size() > 2) { - throw new IOException("EmptyColumnOnlyFilter got unexpected cells: " + kvs.size()); - } else if (kvs.size() == 2) { - // remove the first cell and only return the empty column cell + if (kvs.size() > 1) { kvs.remove(0); - } else if (kvs.size() == 1) { - // we only have 1 cell, check if it is the empty column cell or not - // since the empty column cell could have been excluded by another filter like the - // DistinctPrefixFilter. - Cell cell = kvs.get(0); - if (found && !ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) { - // we found the empty cell, but it was not included so replace the existing cell - // with the empty column cell - kvs.remove(0); - kvs.add(emptyColumnCell); - } } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 328b2eb870b..45e160fac8a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -336,9 +336,12 @@ private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer && groupBy.isOrderPreserving() && (context.getAggregationManager().isEmpty() || groupBy.isUngroupedAggregate()) ) { - - ScanUtil.andFilterAtEnd(scan, - new DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(), cols)); + byte[] ecf = SchemaUtil.getEmptyColumnFamily(table); + byte[] ecq = table.getEncodingScheme() == NON_ENCODED_QUALIFIERS + ? QueryConstants.EMPTY_COLUMN_BYTES + : table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME); + ScanUtil.andFilterAtEnd(scan, new DistinctPrefixFilter( + plan.getTableRef().getTable().getRowKeySchema(), cols, ecf, ecq)); if (!groupBy.isUngroupedAggregate() && plan.getLimit() != null) { // We can push the limit to the server,but for UngroupedAggregate // we can not push the limit. diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java index d668af16b42..6e6858cc7f5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -1900,6 +1900,33 @@ public static SkipScanFilter removeSkipScanFilter(Scan scan) { return null; } + public static DistinctPrefixFilter findDistinctPrefixFilter(Scan scan) { + Filter filter = scan.getFilter(); + if (filter instanceof PagingFilter) { + filter = ((PagingFilter) filter).getDelegateFilter(); + } + return findDistinctPrefixFilter(filter); + } + + public static DistinctPrefixFilter findDistinctPrefixFilter(Filter filter) { + if (filter == null) { + return null; + } + if (filter instanceof DistinctPrefixFilter) { + return (DistinctPrefixFilter) filter; + } + if (filter instanceof FilterList) { + Iterator filterIterator = ((FilterList) filter).getFilters().iterator(); + while (filterIterator.hasNext()) { + DistinctPrefixFilter distinctFilter = findDistinctPrefixFilter(filterIterator.next()); + if (distinctFilter != null) { + return distinctFilter; + } + } + } + return null; + } + /** * Verify whether the given row key is in the scan boundaries i.e. scan start and end keys. * @param ptr row key. diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java index 0b494418248..5bb0c402172 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java @@ -91,9 +91,9 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, final Regio final Scan scan, final RegionCoprocessorEnvironment env, final Scan dataTableScan, final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long pageSizeMs, - final long queryLimit) throws IOException { + final long queryLimit, boolean isDistinct) throws IOException { super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, - viewConstants, ptr, pageSizeMs, queryLimit); + viewConstants, ptr, pageSizeMs, queryLimit, isDistinct); CDCUtil.setupScanForCDC(dataTableScan); cdcDataTableInfo = CDCTableInfo .createFromProto(CDCInfoProtos.CDCTableDef.parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java index 0a8a0adcdef..f457a9f633e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java @@ -88,9 +88,9 @@ public UncoveredGlobalIndexRegionScanner(final RegionScanner innerScanner, final final Scan scan, final RegionCoprocessorEnvironment env, final Scan dataTableScan, final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long pageSizeMs, - final long queryLimit) throws IOException { + final long queryLimit, boolean isDistinct) throws IOException { super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, - viewConstants, ptr, pageSizeMs, queryLimit); + viewConstants, ptr, pageSizeMs, queryLimit, isDistinct); final Configuration config = env.getConfiguration(); hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env); rowCountPerTask = diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java index 1b126a6591c..640b85bc5ac 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java @@ -113,16 +113,25 @@ public UncoveredIndexRegionScanner(final RegionScanner innerScanner, final Regio final Scan scan, final RegionCoprocessorEnvironment env, final Scan dataTableScan, final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long pageSizeMs, - final long queryLimit) { + final long queryLimit, boolean isDistinct) { super(innerScanner); final Configuration config = env.getConfiguration(); - byte[] pageSizeFromScan = scan.getAttribute(INDEX_PAGE_ROWS); - if (pageSizeFromScan != null) { - pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan); + if (isDistinct) { + // If the scan has a DistinctPrefix filter set the batch size to 1. This is because we don't + // want to skip rows without first checking if the row is valid or not and passes any + // additional filters evaluated after merging with the data table. Using a batch of + // size 1 is OK when distinct prefix filter is used since if the row is valid we will jump to + // the next unique prefix so ideally we should be scanning very few rows. + pageSizeInRows = 1; } else { - pageSizeInRows = (int) config.getLong(INDEX_PAGE_SIZE_IN_ROWS, - QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS); + byte[] pageSizeFromScan = scan.getAttribute(INDEX_PAGE_ROWS); + if (pageSizeFromScan != null) { + pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan); + } else { + pageSizeInRows = (int) config.getLong(INDEX_PAGE_SIZE_IN_ROWS, + QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS); + } } if (queryLimit != -1) { pageSizeInRows = Long.min(pageSizeInRows, queryLimit); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java index 6b5d124ce04..43c3eb4e48a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java @@ -49,9 +49,9 @@ public UncoveredLocalIndexRegionScanner(final RegionScanner innerScanner, final final Scan scan, final RegionCoprocessorEnvironment env, final Scan dataTableScan, final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long pageSizeMs, - final int offset, final byte[] actualStartKey, final long queryLimit) { + final int offset, final byte[] actualStartKey, final long queryLimit, boolean isDistinct) { super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, - viewConstants, ptr, pageSizeMs, queryLimit); + viewConstants, ptr, pageSizeMs, queryLimit, isDistinct); this.offset = offset; this.actualStartKey = actualStartKey; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index 7b0a2e3f82c..d9dcd17eea7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext; @@ -63,7 +61,6 @@ import org.apache.phoenix.coprocessor.DataTableScanMetrics; import org.apache.phoenix.coprocessor.DelegateRegionScanner; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; -import org.apache.phoenix.filter.EmptyColumnOnlyFilter; import org.apache.phoenix.filter.PagingFilter; import org.apache.phoenix.filter.UnverifiedRowFilter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -237,23 +234,7 @@ private void init() throws IOException { } private boolean shouldCreateUnverifiedRowFilter(Filter delegateFilter) { - if (delegateFilter == null) { - return false; - } - Filter wrappedFilter = delegateFilter; - if (delegateFilter instanceof FilterList) { - List filters = ((FilterList) delegateFilter).getFilters(); - wrappedFilter = filters.get(0); - } - // Optimization since FirstKeyOnlyFilter and EmptyColumnOnlyFilter - // always include the empty column in the scan result - if ( - wrappedFilter instanceof FirstKeyOnlyFilter - || wrappedFilter instanceof EmptyColumnOnlyFilter - ) { - return false; - } - return true; + return delegateFilter != null && !indexMaintainer.isUncovered(); } public boolean next(List result, boolean raw, ScannerContext scannerContext) @@ -630,7 +611,7 @@ private boolean verifyRowAndRepairIfNecessary(List cellList) throws IOExce long repairStart = EnvironmentEdgeManager.currentTimeMillis(); byte[] rowKey = CellUtil.cloneRow(cell); - long ts = cellList.get(0).getTimestamp(); + long ts = getMaxTimestamp(cellList); cellList.clear(); long repairTime; try { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index bda112e7092..1cbb6f67d35 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -52,6 +52,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.query.QueryConstants; @@ -117,6 +118,7 @@ public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env, final long pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan); Expression extraWhere = null; long extraLimit = -1; + DistinctPrefixFilter distinctFilter = null; { // for indexes construct the row filter for uncovered columns if it exists @@ -169,19 +171,22 @@ public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env, dataTableScan.addColumn(column.getFamily(), column.getQualifier()); } } + // If the DistinctPrefix filter is present on the scan we set the batch size to 1 + // when scanning uncovered index rows + distinctFilter = ScanUtil.findDistinctPrefixFilter(scan); if (ScanUtil.isLocalIndex(scan)) { s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, - offset, actualStartKey, extraLimit); + offset, actualStartKey, extraLimit, distinctFilter != null); } else { if (scan.getAttribute(CDC_DATA_TABLE_DEF) != null) { s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, - extraLimit); + extraLimit, distinctFilter != null); } else { s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, - extraLimit); + extraLimit, distinctFilter != null); } } } @@ -253,6 +258,11 @@ public boolean nextRaw(List result, ScannerContext scannerContext) throws return true; } if (result.size() == 0) { + if (distinctFilter != null) { + // we got an orphaned uncovered index row just reinitialize the distinct filter and + // move to the new row + distinctFilter.reinitialize(); + } return next; } if ((ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) { @@ -274,6 +284,12 @@ public boolean nextRaw(List result, ScannerContext scannerContext) throws extraWhere.evaluate(merged, ptr); if (!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) { result.clear(); + if (distinctFilter != null) { + // The current row was rejected after evaluating the extra where conditions. + // We can't skip to the next unique key prefix as that could result in skipping + // valid result so reinitialize the distinct filter and move to the next row + distinctFilter.reinitialize(); + } return next; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java index f5097875995..a2487cd7789 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DELETE; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_VIEW_INDEX; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ADD; +import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DISTINCT; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DIVERGED_VIEW; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_OFFSET; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ORDERED_GROUP_BY; @@ -31,6 +32,7 @@ import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DELETE; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_ADD; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_DIVERGED_VIEW; +import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_DISTINCT; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_INDEX_REBUILD_ASYNC; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_OFFSET; import static org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ORDERED_GROUP_BY; @@ -475,6 +477,13 @@ public void testOrderByNonPkAddDataByNewClientReadByOldClient() throws Exception assertExpectedOutput(QUERY_ORDER_BY_NON_PK); } + @Test + public void testDistinctPrefixAddDataByNewClientReadByOldClient() throws Exception { + executeQueriesWithCurrentVersion(CREATE_DISTINCT, url, NONE); + executeQueryWithClientVersion(compatibleClientVersion, QUERY_DISTINCT, zkQuorum); + assertExpectedOutput(QUERY_DISTINCT); + } + private boolean isClientCompatibleForOrderedGroupByQuery() { String[] clientVersion = compatibleClientVersion.getVersion().split("\\."); int majorVersion = Integer.parseInt(clientVersion[0]); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java index d72e415085a..bfea0ed7162 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java @@ -84,10 +84,12 @@ public final class BackwardCompatibilityTestUtil { public static final String ORDERED_GROUP_BY = "ordered_groupby"; public static final String ORDER_BY_NON_PK = "orderby_nonpk"; public static final String OFFSET = "offset"; + public static final String DISTINCT = "distinct"; public static final String CREATE_UNORDERED_GROUP_BY = "create_" + UNORDERED_GROUP_BY; public static final String CREATE_ORDERED_GROUP_BY = "create_" + ORDERED_GROUP_BY; public static final String CREATE_ORDER_BY_NON_PK = "create_" + ORDER_BY_NON_PK; public static final String CREATE_OFFSET = "create_" + OFFSET; + public static final String CREATE_DISTINCT = "create_" + DISTINCT; public static final String ADD_DATA = "add_data"; public static final String ADD_DELETE = "add_delete"; public static final String ADD_VIEW_INDEX = "add_view_index"; @@ -99,6 +101,7 @@ public final class BackwardCompatibilityTestUtil { public static final String QUERY_ORDERED_GROUP_BY = QUERY_PREFIX + ORDERED_GROUP_BY; public static final String QUERY_OFFSET = QUERY_PREFIX + OFFSET; public static final String QUERY_ORDER_BY_NON_PK = QUERY_PREFIX + ORDER_BY_NON_PK; + public static final String QUERY_DISTINCT = QUERY_PREFIX + DISTINCT; public static final String QUERY_CREATE_ADD = QUERY_PREFIX + CREATE_ADD; public static final String QUERY_ADD_DATA = QUERY_PREFIX + ADD_DATA; public static final String QUERY_ADD_DELETE = QUERY_PREFIX + ADD_DELETE; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 0abe583a18a..7fe1144b79b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -36,12 +36,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; @@ -1361,6 +1363,160 @@ public void testUnverifiedIndexRowWithSkipScanFilter2() throws Exception { } } + @Test + public void testReadRepairWithDistinctPrefixFilter() throws Exception { + Assume.assumeTrue(async == false); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + conn.createStatement().execute("create table " + dataTableName + + " (id1 varchar(10) not null, id2 varchar(10) not null, val1 varchar(10), val2 varchar(10), " + + "val3 varchar(10), val4 varchar(10) constraint pk primary key(id1, id2))" + + tableDDLOptions); + conn.createStatement().execute("CREATE INDEX " + indexName + " on " + dataTableName + + " (val1, val2) include (val3, val4)" + this.indexDDLOptions); + + // create orphan unverified index row + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a1', 'a2', 'val1', 'val2a', 'val3', 'val4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')"); + commitWithException(conn); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a1', 'a3', 'val1', 'val2a', 'val31', 'val4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a2', 'a1', 'val1', 'val2a', 'val31', 'val4')"); + conn.commit(); + + // create an unverified update to the index row pointing to an existing data row + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a2', 'a1', 'val1', 'val1b', 'val3', 'val4')"); + commitWithException(conn); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a2', 'a2', 'val1', 'val1b', 'val3', 'val4')"); + conn.commit(); + + ArrayList expectedValues = Lists.newArrayList("a1", "a2"); + String selectSql = + "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1' AND val2 = 'val2a'"; + verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues); + + expectedValues = Lists.newArrayList("a2"); + selectSql = + "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1' AND val2 = 'val1b'"; + verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues); + + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')"); + conn.commit(); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + expectedValues = Lists.newArrayList("a1", "a2", "a3"); + selectSql = + "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1' AND val2 = 'val2a'"; + verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues); + + // first verified and then verified + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a4', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')"); + conn.commit(); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a4', 'a2', 'val1_4', 'val1_4', 'val1_4', 'val1_4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a5', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')"); + conn.commit(); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + expectedValues = Lists.newArrayList("a4", "a5"); + selectSql = + "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1_4' AND val2 = 'val1_4'"; + verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues); + } + } + + @Test + public void testUncoveredIndexWithDistinctPrefixFilter() throws Exception { + Assume.assumeTrue(async == false); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String uncoveredIndex1 = generateUniqueName(); + conn.createStatement().execute("create table " + dataTableName + + " (id1 varchar(10) not null, id2 varchar(10) not null, val1 varchar(10), val2 varchar(10), " + + "val3 varchar(10), val4 varchar(10) constraint pk primary key(id1, id2))" + + tableDDLOptions); + conn.createStatement().execute("CREATE UNCOVERED INDEX " + uncoveredIndex1 + " on " + + dataTableName + " (val1)" + this.indexDDLOptions); + + // create orphan unverified index row + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a1', 'a1', 'val1a', 'val2a', 'val3', 'val4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a1', 'a2', 'val1a', 'val2a', 'val3', 'val4')"); + commitWithException(conn); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a1', 'a3', 'val1a', 'val2a', 'val31', 'val4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a1', 'a4', 'val1a', 'val2b', 'val31', 'val4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a1', 'a5', 'val1a', 'val2b', 'val31', 'val4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a2', 'a1', 'val1a', 'val2a', 'val31', 'val4')"); + conn.commit(); + + ArrayList expectedValues = Lists.newArrayList("a1", "a2"); + String selectSql = "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1a'"; + verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, expectedValues); + expectedValues = Lists.newArrayList("a1"); + // add extra where conditions to the query + selectSql = + "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1a' AND val2 = 'val2b'"; + verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, expectedValues); + + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a3', 'a1', 'val1b', 'val2a', 'val31', 'val4')"); + conn.commit(); + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a3', 'a2', 'val1b', 'val2a', 'val3', 'val4')"); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a3', 'a3', 'val1b', 'val2a', 'val3', 'val4')"); + commitWithException(conn); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + conn.createStatement().execute("upsert into " + dataTableName + " " + + "values ('a4', 'a1', 'val1b', 'val2a', 'val31', 'val4')"); + conn.commit(); + expectedValues = Lists.newArrayList("a3", "a4"); + selectSql = "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1b'"; + verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, expectedValues); + } + } + + private void verifyDistinctQueryOnIndex(Connection conn, String indexName, String query, + List expectedValues) throws SQLException, IOException { + try (ResultSet rs = conn.createStatement().executeQuery(query)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String actualExplainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(actualExplainPlan.contains(indexName)); + assertTrue(actualExplainPlan, actualExplainPlan.contains("SERVER DISTINCT PREFIX FILTER")); + List actualValues = Lists.newArrayList(); + while (rs.next()) { + actualValues.add(rs.getString(1)); + } + assertEquals(expectedValues, actualValues); + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(indexName)); + throw e; + } + } + @Test public void testUnverifiedIndexRowWithFirstKeyOnlyFilter() throws Exception { if (async) { diff --git a/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt b/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt new file mode 100644 index 00000000000..df1400b48fd --- /dev/null +++ b/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt @@ -0,0 +1,27 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'COUNT(1)' +'12' +'ID1','ID2' +'a','1' +'a','2' +'b','1' +'b','2' +'c','1' +'c','2' diff --git a/phoenix-core/src/it/resources/sql_files/create_distinct.sql b/phoenix-core/src/it/resources/sql_files/create_distinct.sql new file mode 100644 index 00000000000..29943787cba --- /dev/null +++ b/phoenix-core/src/it/resources/sql_files/create_distinct.sql @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS SCHEMA_0002.TABLE_0002 (ID1 VARCHAR NOT NULL, + ID2 VARCHAR NOT NULL, + ID3 VARCHAR NOT NULL, + COL1 VARCHAR, + COL2 INTEGER CONSTRAINT PK PRIMARY KEY (ID1, ID2, ID3)); + +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','1','x','data1', 10); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','1','y','data2', 20); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','2','x','data3', 30); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','2','y','data4', 40); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','1','x','data5', 50); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','1','y','data6', 60); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','2','x','data7', 70); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','2','y','data8', 80); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','1','x','data9', 90); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','1','y','data10', 100); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','2','x','data11', 110); +UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','2','y','data12', 120); \ No newline at end of file diff --git a/phoenix-core/src/it/resources/sql_files/query_distinct.sql b/phoenix-core/src/it/resources/sql_files/query_distinct.sql new file mode 100644 index 00000000000..9f7578b0081 --- /dev/null +++ b/phoenix-core/src/it/resources/sql_files/query_distinct.sql @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +SELECT COUNT(*) FROM SCHEMA_0002.TABLE_0002; + +SELECT DISTINCT ID1, ID2 FROM SCHEMA_0002.TABLE_0002 ORDER BY ID1, ID2; \ No newline at end of file