Skip to content

Commit a4218d1

Browse files
committed
Benchmark script for base64 ingest
1 parent 50fe720 commit a4218d1

File tree

3 files changed

+148
-0
lines changed

3 files changed

+148
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,4 @@ rest-api-spec
7272
yaml-rest-tests
7373
generated-tests
7474
schema
75+
base64-data

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
"chai": "5.2.1",
7171
"cross-zip": "4.0.1",
7272
"desm": "1.3.1",
73+
"inly": "^5.0.1",
7374
"into-stream": "8.0.1",
7475
"js-yaml": "4.1.0",
7576
"license-checker": "25.0.1",

test/ingest/base64.mjs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
import { readFile } from "node:fs/promises"
7+
import { existsSync, createWriteStream, mkdirSync } from "node:fs"
8+
import { Readable } from "node:stream"
9+
import { finished } from "node:stream/promises"
10+
import inly from 'inly'
11+
import { Client } from '../../index.js'
12+
import { Serializer } from '@elastic/transport'
13+
14+
const client = new Client({
15+
node: process.env.ES_URL,
16+
auth: {
17+
username: process.env.ES_USERNAME,
18+
password: process.env.ES_PASSWORD,
19+
},
20+
compression: false
21+
})
22+
23+
const indexName = "b64-test"
24+
25+
const indexSettings = {
26+
index: indexName,
27+
wait_for_active_shards: 'all',
28+
mappings: {
29+
properties: {
30+
emb: { type: 'dense_vector', dims: 1536, index: true },
31+
docid: { type: 'keyword' },
32+
title: { type: 'text' },
33+
text: { type: 'text' }
34+
}
35+
}
36+
}
37+
38+
const dataset_size = 20000
39+
40+
/**
41+
* Fetches vector data set
42+
*/
43+
async function fetchDataSet () {
44+
const url = 'https://rally-tracks.elastic.co/openai_vector/open_ai_corpus-initial-indexing-1k.json.bz2'
45+
const dir = 'base64-data'
46+
const filePath = `./${dir}/open_ai_corpus-initial-indexing-1k.json`
47+
const filePathBz2 = `./${dir}/open_ai_corpus-initial-indexing-1k.json.bz2`
48+
49+
if (!existsSync(filePath)) {
50+
mkdirSync(dir, { recursive: true })
51+
52+
// download archive
53+
if (!existsSync(filePathBz2)) {
54+
console.log(`Downloading ${url}`)
55+
const { body } = await fetch(url)
56+
const stream = createWriteStream(filePathBz2)
57+
await finished(Readable.fromWeb(body).pipe(stream))
58+
}
59+
60+
// extract archive
61+
await new Promise((resolve, reject) => {
62+
console.log(`Extracting ${filePathBz2} to ${dir}`)
63+
const extract = inly(filePathBz2, dir)
64+
extract.on('error', reject)
65+
extract.on('end', resolve)
66+
})
67+
}
68+
69+
return await readFile(filePath, 'utf8')
70+
}
71+
72+
/**
73+
* Loops over an array until a certain number of records has be yielded
74+
*/
75+
function* loopDataSet (data) {
76+
let count = 0
77+
while (true) {
78+
for (const item of data) {
79+
yield item
80+
count++
81+
if (count >= dataset_size) return
82+
}
83+
}
84+
}
85+
86+
/**
87+
* Bulk ingest the dataset
88+
* @param {number} chunkSize number of documents to serialize before running bulk request
89+
* @param {boolean} base64 If true, encode float32 embeddings array as a base64 string
90+
* @returns {number} Milliseconds the serialize+index operations took
91+
*/
92+
async function index (chunkSize, base64 = false) {
93+
const raw = await fetchDataSet()
94+
const serializer = new Serializer()
95+
let chunk = []
96+
97+
await client.indices.create(indexSettings)
98+
99+
const start = Date.now()
100+
101+
const lines = raw.split(/[\r\n]+/).filter(row => row.trim().length > 0)
102+
for (const line of loopDataSet(lines)) {
103+
const doc = JSON.parse(line)
104+
if (base64) doc.emb = serializer.encodeFloat32Vector(doc.emb)
105+
chunk.push(doc)
106+
if (chunk.length >= chunkSize) {
107+
const operations = chunk.flatMap(doc => [{ index: { _index: indexName } }, doc])
108+
await client.bulk({ operations })
109+
chunk = []
110+
}
111+
}
112+
113+
const duration = Date.now() - start
114+
115+
await client.indices.delete({ index: indexName })
116+
117+
return duration
118+
}
119+
120+
async function run () {
121+
const measurements = []
122+
123+
for (const chunk_size of [100, 250, 500, 1000]) {
124+
const measurement = { dataset_size, chunk_size }
125+
126+
const float32Duration = []
127+
const base64Duration = []
128+
129+
for (const _ of [1, 2, 3]) {
130+
float32Duration.push(await index(chunk_size))
131+
base64Duration.push(await index(chunk_size, true))
132+
}
133+
134+
measurement.float32 = { duration: float32Duration.reduce((a, b) => a + b, 0) / float32Duration.length }
135+
measurement.base64 = { duration: base64Duration.reduce((a, b) => a + b, 0) / base64Duration.length }
136+
137+
measurements.push(measurement)
138+
}
139+
140+
console.log(JSON.stringify(measurements, null, 2))
141+
}
142+
143+
run().catch(err => {
144+
console.error(err)
145+
process.exit(1)
146+
})

0 commit comments

Comments
 (0)