Skip to content

Commit 2b352bd

Browse files
authored
add guide for distributed autofaiss (#98)
1 parent fc7ba51 commit 2b352bd

File tree

3 files changed

+233
-1
lines changed

3 files changed

+233
-1
lines changed

Diff for: Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ build-dist: ## [Continuous integration] Build package for pypi
3030
build-pex:
3131
python3 -m venv .pexing
3232
. .pexing/bin/activate && python -m pip install -U pip && python -m pip install pex
33-
. .pexing/bin/activate && python -m pex setuptools . -o autofaiss.pex -v
33+
. .pexing/bin/activate && python -m pex setuptools pyspark==3.2.1 s3fs>=2022.1.0 . -o autofaiss.pex -v
3434
rm -rf .pexing
3535

3636
.PHONY: help

Diff for: README.md

+8
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,19 @@ However, this mapping will be stored in RAM... We advise you to create your own
8888
numpy array and then call .reconstruct_from_offset() with your custom direct_map.
8989

9090
## Using autofaiss with pyspark
91+
9192
Autofaiss allows users to build indices in Spark, you need to do the following steps:
9293

9394
1. Install pyspark by `pip install pyspark`.
9495
2. Prepare your embeddings files.
9596
3. Create a spark session before using `build_index` (optional), if you don't create it, a default session would
9697
be created with the least configuration.
98+
99+
Also see [distributed_autofaiss.md](docs/distributed/distributed_autofaiss.md) for a full guide of how to use autofaiss in distributed mode.
100+
101+
97102
### Producing N indices
103+
98104
In the distributed mode, you can generate a set of indices with the total memory larger than your current available
99105
memory by setting `nb_indices_to_keep` different from 1.
100106
For example, if you set `nb_indices_to_keep` to 10 and your `index_path` is `knn.index`, you are expected to produce 10
@@ -106,6 +112,8 @@ indices at the end of `build_index` with the followings names:
106112
- `knn.index10`
107113

108114
A [concrete example](examples/distributed_autofaiss_n_indices.py) shows how to produce N indices and how to use them.
115+
116+
109117
## Using the command line
110118

111119
Create embeddings

Diff for: docs/distributed/distributed_autofaiss.md

+224
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
# distributed autofaiss
2+
3+
If you want to generate an index from billion of embeddings, this guide is for you.
4+
5+
This guide is about using pyspark to run autofaiss in multiple nodes.
6+
7+
You may also be interested by [distributed img2dataset](https://github.com/rom1504/img2dataset/blob/main/examples/distributed_img2dataset_tutorial.md)
8+
and [distributed clip inference](https://github.com/rom1504/clip-retrieval/blob/main/docs/distributed_clip_inference.md)
9+
10+
We will be assuming ubuntu 20.04.
11+
12+
## Setup the master node
13+
14+
On the master node:
15+
16+
First download spark:
17+
```bash
18+
wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
19+
tar xf spark-3.2.1-bin-hadoop3.2.tgz
20+
```
21+
22+
Then download autofaiss:
23+
```bash
24+
rm -rf autofaiss.pex
25+
wget https://github.com/criteo/autofaiss/releases/latest/download/autofaiss-3.8.pex -O autofaiss.pex
26+
chmod +x autofaiss.pex
27+
```
28+
29+
If the master node cannot open ports that are visible from your local machine, you can do a tunnel between your local machine and the master node to be able to see the spark ui (at http://localhost:8080)
30+
```bash
31+
ssh -L 8080:localhost:8080 -L 4040:localhost:4040 master_node
32+
```
33+
Replace `master_node` by an ip/host
34+
35+
36+
## Setup the worker nodes
37+
38+
### ssh basic setup
39+
40+
Still in the master node, create a ips.txt with the ips of all the nodes
41+
42+
```bash
43+
ssh-keyscan `cat ips.txt` >> ~/.ssh/known_hosts
44+
```
45+
46+
You may use a script like this to fill your .ssh/config file
47+
```
48+
def generate(ip):
49+
print(
50+
f"Host {ip}\n"
51+
f" HostName {ip}\n"
52+
" User ubuntu\n"
53+
" IdentityFile ~/yourkey.pem"
54+
)
55+
56+
with open("ips.txt") as f:
57+
lines = f.readlines()
58+
for line in lines:
59+
generate(line.strip())
60+
```
61+
python3 generate.py >> ~/.ssh/config
62+
63+
Install pssh with `sudo apt install pssh`
64+
65+
Pick the right username (USER) for the worker nodes, then run this to check your parallel ssh setup:
66+
```bash
67+
USER=ubuntu
68+
```
69+
70+
Optionally, if another node different from the current one has access to the worker nodes, you may need to add a ssh key to all the nodes with:
71+
```
72+
for IP in `cat ips.txt`
73+
do
74+
ssh-copy-id -i the_new_id_rsa $USER@$IP
75+
done
76+
```
77+
78+
Check you can connect to all the nodes with:
79+
```
80+
parallel-ssh -l $USER -i -h ips.txt uname -a
81+
```
82+
83+
### Install some packages
84+
85+
```bash
86+
parallel-ssh -l $USER -i -h ips.txt "sudo apt update"
87+
parallel-ssh -l $USER -i -h ips.txt "sudo apt install openjdk-11-jre-headless libgl1 htop tmux bwm-ng sshfs python3-distutils python3-apt python3.8 -y"
88+
```
89+
90+
91+
### [Optional] Network setting on aws
92+
93+
On aws, the master node and the worker nodes should be in same VPC and security group and allow inbound, so they can communicate.
94+
95+
### Download autofaiss on all nodes
96+
97+
Download autofaiss on all node by retrying this N times until parallel ssh says success for all:
98+
```bash
99+
100+
parallel-ssh -i -h ips.txt "rm -rf autofaiss.pex"
101+
parallel-ssh -i -h ips.txt "wget https://github.com/criteo/autofaiss/releases/latest/download/autofaiss-3.8.pex -O autofaiss.pex"
102+
parallel-ssh -i -h ips.txt "chmod +x autofaiss.pex"
103+
```
104+
105+
### Download spark on workers
106+
107+
```bash
108+
parallel-ssh -l $USER -i -h ips.txt "wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz"
109+
parallel-ssh -l $USER -i -h ips.txt "tar xf spark-3.2.1-bin-hadoop3.2.tgz"
110+
```
111+
112+
### Start the master node
113+
114+
When you're ready, you can start the master node with:
115+
116+
```bash
117+
./spark-3.2.1-bin-hadoop3.2/sbin/start-master.sh -p 7077
118+
```
119+
120+
121+
### Start the worker nodes
122+
123+
When you're ready, you can start the worker nodes with:
124+
125+
```bash
126+
parallel-ssh -l $USER -i -h ips.txt './spark-3.2.1-bin-hadoop3.2/sbin/start-worker.sh -c 16 -m 28G "spark://172.31.35.188:7077"'
127+
```
128+
129+
Replace 172.31.35.188 by the master node ip.
130+
131+
132+
### Stop the worker nodes
133+
134+
When you're done, you can stop the worker nodes with:
135+
136+
```bash
137+
parallel-ssh -l $USER -i -h ips.txt "rm -rf ~/spark-3.2.1-bin-hadoop3.2/work/*"
138+
parallel-ssh -l $USER -i -h ips.txt "pkill java"
139+
```
140+
141+
### Stop the master node
142+
143+
When you're done, you can stop the master node with:
144+
145+
```bash
146+
pkill java
147+
```
148+
149+
150+
### Running autofaiss on it
151+
152+
Once your spark cluster is setup, you're ready to start autofaiss in distributed mode.
153+
Make sure to open your spark UI, at http://localhost:8080 (or the ip where the master node is running)
154+
155+
Save this script to indexing.py.
156+
157+
Then run `./autofaiss.pex indexing.py`
158+
159+
```python
160+
from autofaiss import build_index
161+
from pyspark.sql import SparkSession # pylint: disable=import-outside-toplevel
162+
163+
from pyspark import SparkConf, SparkContext
164+
165+
def create_spark_session():
166+
# this must be a path that is available on all worker nodes
167+
168+
os.environ['PYSPARK_PYTHON'] = "/home/ubuntu/autofaiss.pex"
169+
spark = (
170+
SparkSession.builder
171+
.config("spark.submit.deployMode", "client") \
172+
.config("spark.executorEnv.PEX_ROOT", "./.pex")
173+
#.config("spark.executor.cores", "16")
174+
#.config("spark.cores.max", "48") # you can reduce this number if you want to use only some cores ; if you're using yarn the option name is different, check spark doc
175+
.config("spark.task.cpus", "16")
176+
.config("spark.driver.port", "5678")
177+
.config("spark.driver.blockManager.port", "6678")
178+
.config("spark.driver.host", "172.31.35.188")
179+
.config("spark.driver.bindAddress", "172.31.35.188")
180+
.config("spark.executor.memory", "18G") # make sure to increase this if you're using more cores per executor
181+
.config("spark.executor.memoryOverhead", "8G")
182+
.config("spark.task.maxFailures", "100")
183+
.master("spark://172.31.35.188:7077") # this should point to your master node, if using the tunnelling version, keep this to localhost
184+
.appName("spark-stats")
185+
.getOrCreate()
186+
)
187+
return spark
188+
189+
spark = create_spark_session()
190+
191+
index, index_infos = build_index(
192+
embeddings="hdfs://root/path/to/your/embeddings/folder",
193+
distributed="pyspark",
194+
file_format="parquet",
195+
max_index_memory_usage="16G",
196+
current_memory_available="24G",
197+
temporary_indices_folder="hdfs://root/tmp/distributed_autofaiss_indices",
198+
index_path="hdfs://root/path/to/your/index/knn.index",
199+
index_infos_path="hdfs://root/path/to/your/index/infos.json"
200+
)
201+
202+
```
203+
204+
Another example:
205+
206+
```python
207+
index, index_infos = build_index(
208+
embeddings=["s3://laion-us-east-1/embeddings/vit-l-14/laion2B-en/img_emb","s3://laion-us-east-1/embeddings/vit-l-14/laion2B-multi/img_emb","s3://laion-us-east-1/embeddings/vit-l-14/laion1B-nolang/img_emb"],
209+
distributed="pyspark",
210+
max_index_memory_usage="200G",
211+
current_memory_available="24G",
212+
nb_indices_to_keep=10,
213+
file_format="npy",
214+
temporary_indices_folder="s3://laion-us-east-1/mytest/my_tmp_folder5",
215+
index_path="s3://laion-us-east-1/indices/vit-l-14/image/knn.index",
216+
index_infos_path="s3://laion-us-east-1/indices/vit-l-14/image/infos.json"
217+
)
218+
```
219+
220+
## Benchmark
221+
222+
Computing a 168GB multi pieces `OPQ24_168,IVF131072_HNSW32,PQ24x8` index on 5550336490 embeddings of dim 768 using 10 nodes with 16 cores (c6i.4xlarge)
223+
takes 6h
224+

0 commit comments

Comments
 (0)