Operationalize vector search with Pinecone and Feast Feature Store

Vector embeddings are the key ingredient that makes similarity search possible. Raw data goes from a data store or data stream, through an embedding model to be converted into a vector embedding, and finally into the vector search index.

Vector search with Pinecone

If you have multiple data sources, frequent data updates, and are constantly experimenting with different models, then it becomes harder to maintain an accurate and up-to-date search index. That could lead to subpar results in your recommender systems, search applications, or wherever you are using vector search.

How you store and manage the assets — vector embeddings — is crucial to the accuracy and freshness of your vector search results. This is where “feature stores” come in. Features stores provide a centralized place for managing vector embeddings within organizations with sprawling data sources and frequently updated models. They enable efficient feature engineering and management, feature reuse, and consistency between online and batch embedding models.

Combining a feature store with a similarity search service leads to more accurate and reliable retrieval within your AI/ML applications. In this article, we will build a question-answering application to demonstrate how the Feast feature store can be used alongside Pinecone vector search solution.

Vector search with Feast feature store and Pinecone

The steps are:

  1. Create a catalog of questions with known answers by loading the raw text and their vector embeddings into Feast.
  2. Index vector embeddings of those questions in Pinecone so we can search through them by semantic similarity.
  3. Transform new, incoming questions into vector embeddings and catalog them in Feast, then query Pinecone for the IDs of the most similar known questions, and finally fetch the text of those questions from Feast and display results to the user.

Let’s begin! You can also view the source code on GitHub.

Setup

Let’s install and load necessary Python packages in your preferred cloud environment, like Google Colab.

!pip install -qU feast
!pip install -qU sentence-transformers --no-cache-dir
!pip install -qU pinecone-client

If you are using Google Colab, please restart the runtime after the installation.

import os
import pandas as pd
import numpy as np

Dataset and Model

We use the Quora Question Pairs Dataset that enables a question-answering application. We index a set of questions that can be associated with answers. The application utilizes a new question’s vector embedding to retrieve the top relevant stored question and its associated answer.

The embeddings stored in the feature store are created using the Average Word Embeddings Models. Since we want to query new questions and find the most similar match among questions in the feature store, we need to create a comparable vector. This means that once we define a new question, we will transform it into a vector embedding using the same model.

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('average_word_embeddings_komninos')

Feast Feature Store

It’s time to set up our Feast feature store. We will follow the tutorial for creating a Feast feature store. We intend to use Feast for storing questions and their vector embeddings.

For each question, we will store the following information:

  • A question identifier number. We will store these ids along with the corresponding embeddings in the similarity search index.
  • The question’s text.
  • The question’s vector embeddings. Here, the features being learned using a deep neural network and don’t have an intuitive meaning. Thus, we denote them with their index number e0 ... e300.

Note that if our data contained the answers, we should have stored them along with this information.

# Initialize feast feature store
!feast init feature_repo
os.chdir('feature_repo')

Choose one of these two options to include the necessary file.

  • You can find the file questions.parquet as part of the example. This file contains pre-computed embeddings for each question from the past. We will load this data into our feature store. Please add the questions.parquet file to the /feature_repo/data path.

  • You can run the code from the section Create parquet file yourself (Optional). Using this code, you can control the number of questions you include in the example.

Once we created the feature store and placed the parquet file where necessary, we have to overwrite the default example.py file. This file defines the file source, the entity definition, and the feature view to serve once online.

We will define another file - test_example.py, which will contain the feature view definition for the test questions. Test questions will be defined later, saved into a new parquet file, and loaded into a feature store.

*Note: We added a one-day expiration to the feature view (notice the TTL field).

%%writefile ./example.py


from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource
import os
import platform

path = os.getcwd() + "/data/questions.parquet"
source = FileSource(
    path= path if platform.system() != 'Windows' else path.replace('/', '\\'),
    event_timestamp_column="datetime",
)

question = Entity(name="qid1", value_type=ValueType.INT64)

question_feature = Feature(
    name="question1",
    dtype=ValueType.STRING
)

embedding_features = [
        Feature(name=f"e_{i}", dtype=ValueType.FLOAT)
        for i in range(300)
      ]

questions_view = FeatureView(
    name="questions",
    entities=["qid1"],
    ttl=timedelta(days=1),
    features= [question_feature, *embedding_features],
    
    input=source,
    
)

Overwriting ./example.py
%%writefile ./test_example.py

from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource
import os
import platform

path = os.getcwd() + "/data/test_questions.parquet"
source = FileSource(
    path= path if platform.system() != 'Windows' else path.replace('/', '\\'),
    event_timestamp_column="datetime",
    created_timestamp_column="created",
)

test_question = Entity(name="qid1", value_type=ValueType.INT64, description="question id",)

question_feature = Feature(
    name="question1",
    dtype=ValueType.STRING
)

embedding_features = [
        Feature(name=f"e_{i}", dtype=ValueType.FLOAT)
        for i in range(300)
      ]

test_questions_view = FeatureView(
    name="test_questions",
    entities=["qid1"],
    ttl=Duration(seconds=86400 * 1),
    features= [question_feature, *embedding_features],
    online=True,
    input=source,
    tags={},
)
Overwriting ./test_example.py

To deploy our infrastructure, we need to run the following command.

# Register the features
!feast apply

Finally, we need to populate the online store with the most recent features from the offline store. We can do that with the following command.

!feast materialize 2021-06-02T00:00:00 2021-07-10T00:00:00 --views questions

Note: Don’t forget to change the end date if you created the parquet file yourself!

Uploading Vectors into Pinecone

After setting up our feature store, we are ready to index our question vectors within Pinecone’s similarity search service. Let’s start by defining a Pinecone index, and then uploading the stored vectors into Pinecone.

Pinecone Setup

import pinecone

Use your API key to connect to Pinecone. In case you don’t have one, get your API key here.

# Load Pinecone API key
api_key = os.getenv("PINECONE_API_KEY") or '<YOUR API KEY>'
pinecone.init(api_key=api_key)
pinecone.list_indexes()

Create a new vector index.

# Pick a name for the new index
index_name = 'feast-questions'
if index_name in pinecone.list_indexes():
    pinecone.delete_index(index_name)
# Create a new vector index
pinecone.create_index(name=index_name, metric='cosine', shards=1)

Upload from Feature Store

We fetch the questions' vectors from the feature store in batches and upload them into Pinecone’s vector index.

# Get question ids from the file
question_ids = pd.read_parquet('./data/questions.parquet', columns=['qid1'])
# Define a batch size to read from Feast
BATCH_SIZE = 1000
# Connect to the created index
index = pinecone.Index(name = index_name, response_timeout=300)
# Print info
index.info()
InfoResult(index_size=0)
from feast import FeatureStore

store = FeatureStore(repo_path=".")

for i in range(0, len(question_ids), BATCH_SIZE):
    batch = question_ids[i: i+BATCH_SIZE]

    feature_vectors = store.get_online_features(
        feature_refs=[f'questions:e_{i}'
                      for i in range(300)
                     ],
        entity_rows=[{"qid1":_id} for _id in batch.qid1.to_list()]
    ).to_dict()

    # Prepare list of items to upload into Pinecone's index
    items_to_insert = []

    for e in range(len(feature_vectors['qid1'])):
        l = [feature_vectors[f'questions__e_{i}'][e] for i in range(300)]
        items_to_insert.append((feature_vectors['qid1'][e], np.array(l)))
    
    # Upsert batch data
    index.upsert(items=items_to_insert)  
# Print index info
index.info()
InfoResult(index_size=10000)

Query

We are now all set to start querying our similarity search index. Our queries are questions in text format. We will transform the question into a vector embedding, serve this query vector into Pinecone’s service, and retrieve a set of top-matched stored question IDs. Since Feast acts as the centralized source of truth for feature vectors, we will store the transformed question vectors in Feast and materialize query vectors before forwarding them to Pinecone.

This section describes how to:

  • Define new questions and create their embeddings
  • Manage these embeddings in Feast:
    • Load these embeddings into Feast
    • Fetch test question embeddings from Feast
  • Query Pinecone with the fetched vector embeddings

Define New questions and Create their embeddings

Let’s define new questions first.

df_new_questions = pd.DataFrame([[1000001, 'How can I make money using Youtube?'], 
                                 [1000002, 'What is the best book for learning Python?']], columns=['qid1', 'question1'])
df_new_questions
qid1question1
01000001How can I make money using Youtube?
11000002What is the best book for learning Python?

Then, we create embeddings for these questions and save them in a new parquet file.

# Create embedding for each question
df_new_questions['question_vector'] = df_new_questions.question1.apply(lambda x: model.encode(str(x), show_progress_bar=False))

# Create timestamps 
df_new_questions['created'] = datetime.datetime.utcnow()
df_new_questions['datetime'] = df_new_questions['created'].dt.floor('h')

# Generate columns for vector elements
df_new_questions2 = df_new_questions.question_vector.apply(pd.Series)
df_new_questions2.columns = [f'e_{i}' for i in range(300)]
result = pd.concat([df_new_questions, df_new_questions2], axis=1)

# Exclude some columns
result = result.drop(['question_vector'], axis=1)

# Change directory if needed
if os.getcwd().split('/')[-1] != 'feature_repo':
    os.chdir('feature_repo')

# Save to parquet file
result.to_parquet('./data/test_questions.parquet')

Manage the Embeddings in Feast

Recall that we created and deployed a feature view called test_questions earlier that loads the file we have just created.

We will make these questions accessible when querying the feature store online.

!feast materialize 2021-06-02T00:00:00 2021-07-10T00:00:00 --views test_questions

Now that we have their embeddings in the feature store, we will show how you can fetch the questions using the ids.

# Fetch the feature store and get feature vectors for the query questions
store = FeatureStore(repo_path=".")

feature_vectors = store.get_online_features(
    feature_refs=[f'test_questions:question1',
                  *[f'test_questions:e_{i}'
                    for i in range(300)
                  ]],
    entity_rows=[{"qid1":_id} for _id in df_new_questions.qid1.tolist()]
).to_dict()

# Prepare list of vectors to query Pinecone
query_vectors = []

for e in range(len(feature_vectors['qid1'])):
    l = [feature_vectors[f'test_questions__e_{i}'][e] for i in range(300)]
    query_vectors.append(np.array(l))

Query Pinecone

Next, we query Pinecone and show the most similar questions (from the sample dataset).

# Query Pinecone's index
query_results = index.query(queries=query_vectors, top_k=5)

# Show results
for e, res in enumerate(query_results):
    print(e)
    print('\n\n\n Original question: ' + feature_vectors['test_questions__question1'][e])
    print('\n Most similar questions based on Pinecone vector search: \n')

    # Fetch from Feast to get question text
    result_feature_vectors = store.get_online_features(
        feature_refs=[f'questions:question1'],
        entity_rows=[{"qid1":int(_id)} for _id in res.ids]
    ).to_dict()

    # Prepare and display table
    df_result = pd.DataFrame({'id':res.ids,
                              'question': result_feature_vectors['questions__question1'],
                              'score':res.scores})
    display(df_result)
 Original question: How can I make money using Youtube?

 Most similar questions based on Pinecone vector search: 
idquestionscore
01292How do I make money with YouTube?0.944259
114375How do I make money using Instagram?0.936641
21126How can I earn money from YouTube?0.866271
33759How do you make money giving through a app?0.864226
4157How can I make money through the Internet?0.858337
 Original question: What is the best book for learning Python?

 Most similar questions based on Pinecone vector search: 
idquestionscore
010033What is the best Python learning book for beginners?0.945661
116072Which is the best book for learning Python 3 for absolute beginners?0.872750
213142What's the best way to learn python on my own?0.847575
38939Which is the best book for learning android programming from sratch?0.845041
47023What is the best beginner friendly book on python?0.829327

Turn off the Pinecone Service

Turn off the service once you are sure that you do not want to use it anymore. Once the service is stopped, you cannot use it again.

pinecone.delete_index(index_name)

Summary

We demonstrated the integration between two emerging core ML/AI infrastructure technologies, feature stores and vector similarity search engines.

These technologies deal with feature vectors, the core information unit of any AI/ML application. Feature stores are responsible for all operational aspects of feature vectors, while similarity search engines enable numerous applications relying on semantic retrieval of those vectors.


Optional: Create Your Parquet File

This section presents the code for creating a questions.parquet file for the feature store. We used a sample of 10,000 questions in the default parquet file that we showed. Using the following code, you can create a questions.parquet file with a different number of questions. That way, you can try out what happens once you have fewer/more questions.

# Download dataset
import requests, os, zipfile

DATA_DIR = "tmp"
QA_DIR = f"{DATA_DIR}/quora_duplicate_questions"
QA_FILE = f"{DATA_DIR}/quora_duplicate_questions.tsv"
QA_URL = "https://qim.fs.quoracdn.net/quora_duplicate_questions.tsv"


def download_data():
    os.makedirs(DATA_DIR, exist_ok=True)

    if not os.path.exists(QA_DIR):
        if not os.path.exists(QA_FILE):
            r = requests.get(QA_URL) 
            with open(QA_FILE, "wb") as f:
                f.write(r.content)

download_data()
pd.set_option('display.max_colwidth', 500)
df = pd.read_csv(QA_FILE, sep='\t',  usecols=["qid1", "question1"], index_col=False)
df = df.reset_index(drop=True)
df.drop_duplicates(inplace=True)
df.head()
qid1question1
01What is the step by step guide to invest in share market in india?
13What is the story of Kohinoor (Koh-i-Noor) Diamond?
25How can I increase the speed of my internet connection while using a VPN?
37Why am I mentally very lonely? How can I solve it?
49Which one dissolve in water quikly sugar, salt, methane and carbon di oxide?
# Set any value for number of questions
NUM_OF_QUESTIONS = 10000
# Or select the complete dataset
#NUM_OF_QUESTIONS = len(df)
import datetime

# Use only defined number of rows
df = df[:NUM_OF_QUESTIONS]

# Create embedding for each question
df['question_vector'] = df.question1.apply(lambda x: model.encode(str(x)))

# Create timestamps 
df['created'] = datetime.datetime.utcnow()
df['datetime'] = df['created'].dt.floor('h')

# Generate columns for vector elements
df2 = df.question_vector.apply(pd.Series)
df2.columns = [f'e_{i}' for i in range(300)]
result = pd.concat([df, df2], axis=1)

# Exclude some columns
result = result.drop(['question_vector'], axis=1)

# Change directory if needed
if os.getcwd().split('/')[-1] != 'feature_repo':
    os.chdir('feature_repo')
    
# Save to parquet file
result[:NUM_OF_QUESTIONS].to_parquet('./data/questions.parquet')

What will you build?

Upgrade your search or recommendation systems with just a few lines of code, or contact us for help.

}