Source code for bdc.steps.google_places_detailed

# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Lucca Baumgärtner <lucca.baumgaertner@fau.de>
# SPDX-FileCopyrightText: 2023 Sophie Heasman <sophieheasmann@gmail.com>
# SPDX-FileCopyrightText: 2023 Ruchita Nathani <Ruchita.nathani@fau.de>
# SPDX-FileCopyrightText: 2023 Ahmed Sheta <ahmed.sheta@fau.de>

import json
import os
from http import HTTPStatus

import boto3
import googlemaps
import pandas as pd
from googlemaps.exceptions import ApiError, HTTPError, Timeout, TransportError
from requests import RequestException
from tqdm import tqdm

from bdc.steps.helpers import get_lead_hash_generator
from bdc.steps.step import Step, StepError
from config import GOOGLE_PLACES_API_KEY
from database import get_database
from logger import get_logger

log = get_logger()


[docs] class GooglePlacesDetailed(Step): """ The GooglePlacesDetailed step will try to gather detailed information for a given google business entry, identified by the place ID. This information could be the website link, the review text and the business type. Reviews will be saved to a separate location based on the persistence settings this could be local or AWS S3. Attributes: name: Name of this step, used for logging added_cols: List of fields that will be added to the main dataframe by executing this step required_cols: List of fields that are required to be existent in the input dataframe before performing this step Added Columns: google_places_detailed_website (str): The website of the company from google places google_places_detailed_type (str): The type of the company from google places """ name = "Google_Places_Detailed" # fields that are expected as an output of the df.apply lambda function df_fields = ["website", "type"] # Weirdly the expression [f"{name}_{field}" for field in df_fields] gives an error as name is not in the scope of the iterator added_cols = [ name + field for (name, field) in zip( [f"{name.lower()}_"] * (len(df_fields)), ([f"{field}" for field in df_fields]), ) ] required_cols = ["google_places_place_id"] # fields that are accessed directly from the api api_fields = ["website", "type", "reviews"] # Output fields are not necessarily the same as input fields api_fields_output = ["website", "types"] gmaps = None
[docs] def load_data(self) -> None: # don't perform this in class body or else it will fail in tests due to missing API key if GOOGLE_PLACES_API_KEY is None: raise StepError("An API key for Google Places is needed to run this step!") self.gmaps = googlemaps.Client(key=GOOGLE_PLACES_API_KEY)
[docs] def verify(self) -> bool: return super().verify() and GOOGLE_PLACES_API_KEY is not None
[docs] def run(self) -> pd.DataFrame: # Call places API tqdm.pandas(desc="Getting info from Places API") # generate_hash = GenerateHashLeads() self.df[ [f"{self.name.lower()}_{field}" for field in self.df_fields] ] = self.df.progress_apply( lambda lead: get_lead_hash_generator().hash_check( lead, self.get_data_from_detailed_google_api, self.name, [f"{self.name.lower()}_{field}" for field in self.df_fields], lead, ), axis=1, ) # self.df[ # [f"{self.name.lower()}_{field}" for field in self.df_fields] # ] = self.df.progress_apply( # lambda lead: self.get_data_from_detailed_google_api(lead), axis=1 # ) return self.df
[docs] def finish(self) -> None: pass
[docs] def get_data_from_detailed_google_api(self, lead_row): error_return_value = pd.Series([None] * len(self.df_fields)) place_id = lead_row["google_places_place_id"] if place_id is None or pd.isna(place_id): return error_return_value # Call for the detailed API using specified fields try: # Fetch place details including reviews response = self.gmaps.place( place_id, fields=self.api_fields, language="original", reviews_no_translations=True, ) # Check response status if response.get("status") != HTTPStatus.OK.name: log.warning( f"Failed to fetch data. Status code: {response.get('status')}" ) return error_return_value except RequestException as e: log.error(f"Error: {str(e)}") except (ApiError, HTTPError, Timeout, TransportError) as e: error_message = ( str(e.message) if hasattr(e, "message") and e.message is not None else str(e) ) log.warning(f"Error: {error_message}") reviews = [] if "result" in response and "reviews" in response["result"]: reviews = response["result"]["reviews"] get_database().save_review(reviews, place_id) results_list = [ response["result"][field] if field in response["result"] else None for field in self.api_fields_output ] return pd.Series(results_list)