Skip to content

Hnsw streaming #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,63 @@ Use [partitioning](https://www.postgresql.org/docs/current/ddl-partitioning.html
CREATE TABLE items (embedding vector(3), category_id int) PARTITION BY LIST(category_id);
```

## Streaming Queries [unreleased]

*Added in 0.8.0*

With approximate indexes, you can end up with less results than expected due to filtering conditions in the query.

Starting with 0.8.0, you can enable streaming queries. If too few results from the initial index scan match the query filters, it will resume scanning until enough results are found. This can significantly improve recall (at the cost of speed).

```tsql
SET hnsw.streaming = on;
-- or
SET ivfflat.streaming = off;
```

### Streaming Options

Since scanning a large portion of the index is expensive, there are options to control when the scan ends.

#### HNSW

Specify the max number of additional tuples visited

```sql
SET hnsw.ef_stream = 10000;
```

The scan will also end if reaches `work_mem`, at which point a notice is shown

```text
NOTICE: hnsw index scan exceeded work_mem after 50000 tuples
HINT: Increase work_mem to scan more tuples.
```

Adjust this with:

```sql
SET work_mem = '8MB';
```

#### IVFFlat

Specify the max number of probes

```sql
SET ivfflat.max_probes = 100;
```

### Streaming Order

With streaming queries, it’s possible for rows to be slightly out of order by distance. For strict ordering, use:

```sql
WITH approx_order AS MATERIALIZED (
SELECT *, embedding <-> '[1,2,3]' AS distance FROM items WHERE ... ORDER BY distance LIMIT 5
) SELECT * FROM approx_order ORDER BY distance;
```

## Half-Precision Vectors

*Added in 0.7.0*
Expand Down
14 changes: 14 additions & 0 deletions src/hnsw.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#endif

int hnsw_ef_search;
int hnsw_ef_stream;
bool hnsw_streaming;
int hnsw_lock_tranche_id;
static relopt_kind hnsw_relopt_kind;

Expand Down Expand Up @@ -68,6 +70,16 @@ HnswInit(void)
"Valid range is 1..1000.", &hnsw_ef_search,
HNSW_DEFAULT_EF_SEARCH, HNSW_MIN_EF_SEARCH, HNSW_MAX_EF_SEARCH, PGC_USERSET, 0, NULL, NULL, NULL);

/* TODO Figure out name */
DefineCustomBoolVariable("hnsw.streaming", "Use streaming mode",
NULL, &hnsw_streaming,
HNSW_DEFAULT_STREAMING, PGC_USERSET, 0, NULL, NULL, NULL);

/* TODO Figure out name */
DefineCustomIntVariable("hnsw.ef_stream", "Sets the max number of additional candidates to visit for streaming search",
"-1 means all", &hnsw_ef_stream,
HNSW_DEFAULT_EF_STREAM, HNSW_MIN_EF_STREAM, HNSW_MAX_EF_STREAM, PGC_USERSET, 0, NULL, NULL, NULL);

MarkGUCPrefixReserved("hnsw");
}

Expand Down Expand Up @@ -126,6 +138,8 @@ hnswcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
/* Account for number of tuples (or entry level), m, and ef_search */
costs.numIndexTuples = (entryLevel + 2) * m;

/* TODO Adjust for selectivity for iterative scans */

genericcostestimate(root, path, loop_count, &costs);

/* Use total cost since most work happens before first tuple is returned */
Expand Down
58 changes: 50 additions & 8 deletions src/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include "utils/sampling.h"
#include "vector.h"

#ifdef HNSW_BENCH
#include "portability/instr_time.h"
#endif

#define HNSW_MAX_DIM 2000
#define HNSW_MAX_NNZ 1000

Expand Down Expand Up @@ -42,6 +46,10 @@
#define HNSW_DEFAULT_EF_SEARCH 40
#define HNSW_MIN_EF_SEARCH 1
#define HNSW_MAX_EF_SEARCH 1000
#define HNSW_DEFAULT_STREAMING false
#define HNSW_DEFAULT_EF_STREAM -1
#define HNSW_MIN_EF_STREAM -1
#define HNSW_MAX_EF_STREAM INT_MAX

/* Tuple types */
#define HNSW_ELEMENT_TUPLE_TYPE 1
Expand All @@ -68,6 +76,21 @@
#define HnswPageGetOpaque(page) ((HnswPageOpaque) PageGetSpecialPointer(page))
#define HnswPageGetMeta(page) ((HnswMetaPageData *) PageGetContents(page))

#ifdef HNSW_BENCH
#define HnswBench(name, code) \
do { \
instr_time start; \
instr_time duration; \
INSTR_TIME_SET_CURRENT(start); \
(code); \
INSTR_TIME_SET_CURRENT(duration); \
INSTR_TIME_SUBTRACT(duration, start); \
elog(INFO, "%s: %.3f ms", name, INSTR_TIME_GET_MILLISEC(duration)); \
} while (0)
#else
#define HnswBench(name, code) (code)
#endif

#if PG_VERSION_NUM >= 150000
#define RandomDouble() pg_prng_double(&pg_global_prng_state)
#define SeedRandom(seed) pg_prng_seed(&pg_global_prng_state, seed)
Expand Down Expand Up @@ -106,6 +129,8 @@

/* Variables */
extern int hnsw_ef_search;
extern int hnsw_ef_stream;
extern bool hnsw_streaming;
extern int hnsw_lock_tranche_id;

typedef struct HnswElementData HnswElementData;
Expand All @@ -129,6 +154,7 @@ struct HnswElementData
uint8 heaptidsLength;
uint8 level;
uint8 deleted;
uint8 version;
uint32 hash;
HnswNeighborsPtr neighbors;
BlockNumber blkno;
Expand All @@ -155,12 +181,16 @@ struct HnswNeighborArray
HnswCandidate items[FLEXIBLE_ARRAY_MEMBER];
};

typedef struct HnswPairingHeapNode
typedef struct HnswSearchCandidate
{
HnswCandidate *inner;
pairingheap_node c_node;
pairingheap_node w_node;
} HnswPairingHeapNode;
HnswElementPtr element;
float distance;
} HnswSearchCandidate;

#define HnswGetSearchCandidate(membername, ptr) pairingheap_container(HnswSearchCandidate, membername, ptr)
#define HnswGetSearchCandidateConst(membername, ptr) pairingheap_const_container(HnswSearchCandidate, membername, ptr)

/* HNSW index options */
typedef struct HnswOptions
Expand Down Expand Up @@ -305,10 +335,10 @@ typedef struct HnswElementTupleData
uint8 type;
uint8 level;
uint8 deleted;
uint8 unused;
uint8 version;
ItemPointerData heaptids[HNSW_HEAPTIDS];
ItemPointerData neighbortid;
uint16 unused2;
uint16 unused;
Vector data;
} HnswElementTupleData;

Expand All @@ -317,18 +347,30 @@ typedef HnswElementTupleData * HnswElementTuple;
typedef struct HnswNeighborTupleData
{
uint8 type;
uint8 unused;
uint8 version;
uint16 count;
ItemPointerData indextids[FLEXIBLE_ARRAY_MEMBER];
} HnswNeighborTupleData;

typedef HnswNeighborTupleData * HnswNeighborTuple;

typedef union
{
struct pointerhash_hash *pointers;
struct offsethash_hash *offsets;
struct tidhash_hash *tids;
} visited_hash;

typedef struct HnswScanOpaqueData
{
const HnswTypeInfo *typeInfo;
bool first;
List *w;
visited_hash v;
pairingheap *discarded;
Datum q;
int m;
int64 tuples;
MemoryContext tmpCtx;

/* Support functions */
Expand Down Expand Up @@ -374,14 +416,14 @@ bool HnswCheckNorm(FmgrInfo *procinfo, Oid collation, Datum value);
Buffer HnswNewBuffer(Relation index, ForkNumber forkNum);
void HnswInitPage(Buffer buf, Page page);
void HnswInit(void);
List *HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement);
List *HnswSearchLayer(char *base, Datum q, List *ep, int ef, int lc, Relation index, FmgrInfo *procinfo, Oid collation, int m, bool inserting, HnswElement skipElement, visited_hash * v, pairingheap **discarded, bool initVisited, int64 *tuples);
HnswElement HnswGetEntryPoint(Relation index);
void HnswGetMetaPageInfo(Relation index, int *m, HnswElement * entryPoint);
void *HnswAlloc(HnswAllocator * allocator, Size size);
HnswElement HnswInitElement(char *base, ItemPointer tid, int m, double ml, int maxLevel, HnswAllocator * alloc);
HnswElement HnswInitElementFromBlock(BlockNumber blkno, OffsetNumber offno);
void HnswFindElementNeighbors(char *base, HnswElement element, HnswElement entryPoint, Relation index, FmgrInfo *procinfo, Oid collation, int m, int efConstruction, bool existing);
HnswCandidate *HnswEntryCandidate(char *base, HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec);
HnswSearchCandidate *HnswEntryCandidate(char *base, HnswElement em, Datum q, Relation rel, FmgrInfo *procinfo, Oid collation, bool loadVec);
void HnswUpdateMetaPage(Relation index, int updateEntry, HnswElement entryPoint, BlockNumber insertPage, ForkNumber forkNum, bool building);
void HnswSetNeighborTuple(char *base, HnswNeighborTuple ntup, HnswElement e, int m);
void HnswAddHeapTid(HnswElement element, ItemPointer heaptid);
Expand Down
10 changes: 8 additions & 2 deletions src/hnswinsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ GetInsertPage(Relation index)
* Check for a free offset
*/
static bool
HnswFreeOffset(Relation index, Buffer buf, Page page, HnswElement element, Size etupSize, Size ntupSize, Buffer *nbuf, Page *npage, OffsetNumber *freeOffno, OffsetNumber *freeNeighborOffno, BlockNumber *newInsertPage)
HnswFreeOffset(Relation index, Buffer buf, Page page, HnswElement element, Size etupSize, Size ntupSize, Buffer *nbuf, Page *npage, OffsetNumber *freeOffno, OffsetNumber *freeNeighborOffno, BlockNumber *newInsertPage, uint8 *tupleVersion)
{
OffsetNumber offno;
OffsetNumber maxoffno = PageGetMaxOffsetNumber(page);
Expand Down Expand Up @@ -98,6 +98,7 @@ HnswFreeOffset(Relation index, Buffer buf, Page page, HnswElement element, Size
{
*freeOffno = offno;
*freeNeighborOffno = neighborOffno;
*tupleVersion = etup->version;
return true;
}
else if (*nbuf != buf)
Expand Down Expand Up @@ -153,6 +154,7 @@ AddElementOnDisk(Relation index, HnswElement e, int m, BlockNumber insertPage, B
OffsetNumber freeOffno = InvalidOffsetNumber;
OffsetNumber freeNeighborOffno = InvalidOffsetNumber;
BlockNumber newInsertPage = InvalidBlockNumber;
uint8 tupleVersion;
char *base = NULL;

/* Calculate sizes */
Expand Down Expand Up @@ -202,7 +204,7 @@ AddElementOnDisk(Relation index, HnswElement e, int m, BlockNumber insertPage, B
}

/* Next, try space from a deleted element */
if (HnswFreeOffset(index, buf, page, e, etupSize, ntupSize, &nbuf, &npage, &freeOffno, &freeNeighborOffno, &newInsertPage))
if (HnswFreeOffset(index, buf, page, e, etupSize, ntupSize, &nbuf, &npage, &freeOffno, &freeNeighborOffno, &newInsertPage, &tupleVersion))
{
if (nbuf != buf)
{
Expand All @@ -212,6 +214,10 @@ AddElementOnDisk(Relation index, HnswElement e, int m, BlockNumber insertPage, B
npage = GenericXLogRegisterBuffer(state, nbuf, 0);
}

/* Set tuple version */
etup->version = tupleVersion;
ntup->version = tupleVersion;

break;
}

Expand Down
Loading