Product Recommendation Engine

Learn how to build a product recommendation engine using collaborative filtering and Pinecone.

In this example, we will generate product recommendations for ecommerce customers based on previous orders and trending items. This example covers creating and deploying the Pinecone service, adding pre-made embedding models for preprocessing, writing data to Pinecone, and finally querying Pinecone to receive a ranked list of recommended products.

Note: This example assumes you already have an embedding model that is dockerized and ready for uploading to the Pinecone model hub (see documentation).

Data Preparation

Import Python Libraries

!pip install --quiet numpy
!pip install --quiet pandas
!pip install --quiet scipy

import time
import random

import numpy as np
import pandas as pd

import scipy.sparse as sparse

Load (Example) Instacart Data

The data used throughout this example is a set of files describing customers' orders over time. The main focus is on the orders.csv file, where each line represents a relation between a user and the order. In other words, each line has information on user_id (user who made the order) and order_id. Note there is no information about products in this table. Product information related to specific orders is stored in the order_product__*.csv dataset.

import tarfile
tf = tarfile.open("data.tgz")
tf.extractall()
tf.close()

order_products_train = pd.read_csv('./data/order_products__train.csv')
order_products_prior = pd.read_csv('./data/order_products__prior.csv')
products = pd.read_csv('./data/products.csv')
orders = pd.read_csv('./data/orders.csv')

order_products = order_products_train.append(order_products_prior)

Prepare Data for the Model

The Collaborative Filtering model used in this example requires only users’ historical preferences on a set of items. As there is no explicit rating in the data we are using, the purchase quantity can represent a “confidence” in terms of how strong the interaction was between the user and the products.

The dataframe data will store this data and will be the base for the model.

customer_order_products = pd.merge(orders, order_products, how='inner',on='order_id')

# creating a table with "confidences"
data = customer_order_products.groupby(['user_id', 'product_id'])[['order_id']].count().reset_index()
data.columns=["user_id", "product_id", "total_orders"]
data.product_id = data.product_id.astype('int64')

# Create a lookup frame so we can get the product names back in readable form later.
products_lookup = products[['product_id', 'product_name']].drop_duplicates()
products_lookup['product_id'] = products_lookup.product_id.astype('int64')

We will create prototype users and add them to our data dataframe. Each user is buying the following products:

  • The first user is buying Mineral Water.
  • The second user is buying two baby products: No More Tears Baby Shampoo and Baby Wash & Shampoo

These users will later be used for querying and examination of the model results.

data_new = pd.DataFrame([[data.user_id.max() + 1, 22802, 97],
                         [data.user_id.max() + 2, 26834, 89],
                         [data.user_id.max() + 2, 12590, 77]
                        ], columns=['user_id', 'product_id', 'total_orders'])
data_new

data = data.append(data_new).reset_index(drop = True)
data.tail()

Build and Deploy the Product Recommendation Service

This section shows how to use Pinecone to easily build and deploy a product recommendation engine that turns raw data into vector embeddings, maintains a live index of those vectors, and returns recommended products on-demand.

Install and Set Up Pinecone

!pip install --quiet -U pinecone-client

import pinecone.graph
import pinecone.service
import pinecone.connector
import pinecone.hub

# Load Pinecone API key

api_key = '<YOUR API KEY HERE>'

# In this case, we store the API key in a file.
with open('api_key.txt') as infile:
    api_key = infile.read().strip()

pinecone.init(api_key=api_key)

Get a Pinecone API key if you don’t have one already.

Create a New Service

The typical workflow of creating a Pinecone service:

  1. Define the processing steps for incoming requests, also known as a graph.
  2. Deploy the graph and wait for the corresponding named-service to become live.
  3. Create a connection to the service, and start sending insert and query requests.

Pinecone lets you add models for preprocessing raw data during writes (data upserts) and reads (queries). This is done by defining a graph, or the series of steps any upsert, delete, query, or fetch request must go through. Each step in the graph is a docker image that gets uploaded to the Pinecone model hub.

The most basic version of a graph simply loads already vectorized data (embedded elsewhere) into Pinecone. But why do that when you can add your models to the graph and have Pinecone embed raw data into vectors in real-time? That’s what we will do here with our two models which transform raw user and product data into vectors.

Define Model Hub Image Builders

product_image_builder = pinecone.hub.ImageBuilder(
    image="product-preprocessor:v01",  # The name of the docker image (you should also tag the image)
    build_path="./docker_build/product_preprocessor/v01",  # path to which docker build artifacts are saved
    model_path='./product_preprocessor.py',  # main model file
    data_paths=['./product_preprocessor_model'],  # additional files or directories needed
)

user_image_builder = pinecone.hub.ImageBuilder(
    image="user-preprocessor:v01",  # The name of the docker image (you should also tag the image)
    build_path="./docker_build/user_preprocessor/v01",  # path to which docker build artifacts are saved
    model_path='./user_preprocessor.py',  # main model file
    data_paths=['./user_preprocessor_model'],  # additional files or directories needed
)

Assumption: Readymade Preprocessing Units

Here we assume the code of the preprocessing units was already created.

import tarfile
tf = tarfile.open("models_and_dockers.tgz")
tf.extractall()
tf.close()

Upload Preprocessors to the Model Hub

Preprocessors are docker images. The model hub api provides the utilities to create the docker images and push them to the model hub.

# create docker images
!{product_image_builder.get_build_cmd()}
!{user_image_builder.get_build_cmd()}

# upload docker images to the model hub
if 'product-preprocessor' not in pinecone.hub.list_repositories() or 'v01' not in pinecone.hub.list_repository_tags('product-preprocessor'):
    !{product_image_builder.get_push_cmd()}
if 'user-preprocessor' not in pinecone.hub.list_repositories() or 'v01' not in pinecone.hub.list_repository_tags('user-preprocessor'):
    !{user_image_builder.get_push_cmd()}

Deploy the New Service

Next we deploy the service and display the updated graph.

graph = pinecone.graph.IndexGraph(metric='dotproduct')

# Name of the hub images
product_image_name = pinecone.hub.as_user_image(product_image_builder.image)
user_image_name = pinecone.hub.as_user_image(user_image_builder.image)

# Add to the graph functions that will transform the items and the queries.
product_preprocessor = pinecone.hub.HubFunction(name='product-preprocessor', image=product_image_name)
user_preprocessor = pinecone.hub.HubFunction(name='user-preprocessor', image=user_image_name)

graph.add_write_preprocessor(fn=product_preprocessor)
graph.add_read_preprocessor(fn=user_preprocessor)

# View the updated graph
graph.view()

service_name = 'als-instacart'
pinecone.service.deploy(service_name, graph, timeout=300)

pinecone.service.describe(service_name)

Connect to the Service

conn = pinecone.connector.connect(service_name)

conn.info()

Upload Product Names

Next we upload product names and demonstrate how to fetch embeddings.

products_lookup

items_to_upload = [(product, product) for product in products_lookup.product_id.tolist()]
display(items_to_upload[:5])

upsert_acks = conn.upsert(items=items_to_upload).collect()

conn.fetch(ids=["3"]).collect()

First we define a utility function.

# Utility function
def products_bought_by_user_in_the_past(user_id: int, top: int = 10):

    selected = data[data.user_id == user_id].sort_values(by=['total_orders'], ascending=False)

    selected['product_name'] = selected['product_id'].map(products_lookup.set_index('product_id')['product_name'])
    selected = selected[['product_id', 'product_name', 'total_orders']].reset_index(drop=True)
    if selected.shape[0] < top:
        return selected

    return selected[:top]

Then we look for the users we defined earlier and fetch their recommended products:

user_ids = [206210, 206211, 103593]

# Query by user embeddings
start_time = time.process_time()
cursor = conn.query(queries=user_ids, top_k=10)
query_results = cursor.collect()
print("Time needed for retrieving recommended products using Pinecone: " + str(time.process_time() - start_time))


# Print results
for _id, res in zip(user_ids, query_results):
    print(f'user_id={_id}')
    df = pd.DataFrame({'ids': res.ids,
                       'product': [products_lookup.loc[products_lookup.product_id == int(product_id)-1].product_name.values[0] for product_id in res.ids],
                       'scores': res.scores})
    print("Recommended:")
    display(df)
    print("Top buys:")
    display(products_bought_by_user_in_the_past(_id, top=15))

Sample product recommendations

Sample product recommendations

Sample product recommendations

Sample product recommendations

The recommended products are indeed related to user’s previous purchases.

If we want to exclude products the customer has already purchased in the past, we can add post-processing of the query result.

def transform(previous, query_results, products_lookup):

    output_query_result = query_results.copy()
    for e, res in enumerate(output_query_result):

        previous_ids_for_user_e = previous[e].tolist()
        for ee, _id in enumerate(res.ids):
            product = products_lookup.loc[products_lookup.product_id == int(_id)-1].product_name.values[0]
            if product in previous_ids_for_user_e:
                res.ids.remove(_id)
                res.scores.remove(res.scores[ee])
                res.ids.append(_id)
                res.scores.append(0.0)

    return output_query_result

The updated query result is obtained in the next step. Here you can see recommended products that do not contain previous purchases.

new_query_result = transform([products_bought_by_user_in_the_past(_id, top = 100).product_name for _id in user_ids],
                             query_results,
                             products_lookup)

# Print results
for _id, res in zip(user_ids, query_results):
    print(f'user_id={_id}')
    df = pd.DataFrame({'ids': res.ids,
                       'product': [products_lookup.loc[products_lookup.product_id == int(product_id)-1].product_name.values[0] for product_id in res.ids],
                       'scores': res.scores})
    print("Recommended:")
    display(df)
    print("Top buys:")
    display(products_bought_by_user_in_the_past(_id, top=15))

Now, previously purchased products have a score of 0.0, although they still appear in the recommendation list.

Sample product recommendations

All that’s left to do is surface these recommendations on the shopping site, or feed them into other applications.

Cleanup

Stop the Pinecone service.

for svc in pinecone.service.ls():
    pinecone.service.stop(svc)

Summary

In this example notebook we used Pinecone to build and deploy a product recommendation engine that uses collaborative filtering, relatively quickly. Once deployed, the product recommendation engine can process new data, retrieve recommendations in milliseconds, and send results to production applications.