SEO Python Codebase — Recursive Semantic Clustering Output
This page presents a Python notebook implementing a recursive SEO and keyword-analysis workflow. It includes Google Custom Search collection, robots handling, text processing, transformer-based language tooling, keyword extraction, word-frequency analysis, network analysis, and Apriori-style association discovery. The content below is rendered as static, crawlable HTML rather than a full Jupyter interface.
Python block 1
# Standard libraries
import requests
import time
from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser
from collections import Counter
import gc
import re
import ipywidgets as widgets
from IPython.display import display
import string
from func_timeout import func_timeout, FunctionTimedOut
from typing import Dict, Any, List
# Data manipulation
import pandas as pd
import numpy as np
# Web scraping
from bs4 import BeautifulSoup
# Natural Language Processing (NLP) libraries
import nltk
from nltk.corpus import stopwords, words
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk import pos_tag
# nltk.download('universal_tagset')
# Transformers and NLP models
import torch
import transformers
from transformers import (T5Tokenizer, T5ForConditionalGeneration, pipeline, RobertaTokenizer, RobertaForCausalLM,
RobertaForSequenceClassification, BartForConditionalGeneration, BartTokenizer,
AutoTokenizer, MT5ForConditionalGeneration)
from sentence_transformers import SentenceTransformer
from keybert import KeyBERT
# Text processing and similarity
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# Association rule learning library
from apyori import apriori
# Network analysis and community detection
import networkx as nx
import community as community_louvain
# Visualization
import matplotlib.pyplot as plt
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from mlxtend.frequent_patterns import apriori, association_rules
from ipywidgets import widgets
from IPython.display import display, clear_output
from sklearn.preprocessing import MultiLabelBinarizer
import warnings
Python block 2
time.strftime('%Y-%m-%d %H:%M', time.localtime())
Keywords
Python block 3
def get_search_results(keyword, num_results=10):
search_results = []
for start_index in range(1, num_results + 1, 10):
params = {
'key': API_KEY,
'cx': SEARCH_ENGINE_ID,
'q': keyword,
'start': start_index,
'num': min(num_results - len(search_results), 10)
}
response = requests.get('https://www.googleapis.com/customsearch/v1', params=params)
data = response.json()
if 'items' in data:
for item in data['items']:
search_results.append(item['link'])
else:
break # No more results
return search_results
Python block 4
keywords = [
"Corrupted Folklore Horror Art",
"Ritualistic Horror and Occult Aesthetics",
"Unholy Carnival and Twisted Clowns",
"Blasphemous Iconography in Surreal Art",
"Dark Occult Surrealism with Gothic Overtones",
"Grotesque Ritual and Nightmare Ceremonies",
"Cursed Performers and Macabre Masquerades",
"Psychedelic Occult Horror with Symbolic Decay",
"Lowbrow Horror Art with Religious Subversion",
"Gothic Horror with Surrealist Rituals",
"Pop Surrealism Meets Cosmic Horror",
"Disturbing Clown Horror in Contemporary Art",
"Carnival of the Damned: Twisted Circus Aesthetics",
"Glitched Nightmare Art and Digital Corruption",
"Baroque Horror and Decadent Rot",
"Sacrilegious Surrealism and Defiled Faith",
"Decaying Grandeur and Eerie Processions",
"Unsettling Theatrical Horror in Visual Art",
"Demonic Puppetry and Possessed Automata",
"Rave Horror: Neon Terror and Lurid Dystopias"
]
Python block 5
data = []
for keyword in keywords:
print(f"Fetching URLs for keyword: {keyword}")
urls = get_search_results(keyword, num_results=50) # Adjust num_results as needed
for url in urls:
data.append({"Keyword": keyword, "URL": url})
# Save the data to a DataFrame and export it as a CSV file
keyword_urls_df = pd.DataFrame(data)
keyword_urls_df.to_csv('keyword_urls.csv', index=False)
print("Data has been saved to keyword_urls.csv")
Python block 6
keyword_urls_df = pd.read_csv('keyword_urls.csv')
len(keyword_urls_df)
Python block 7
keyword_urls_df.drop_duplicates(inplace=True)
Python block 8
keyword_urls_df
Python block 9
def is_scraping_allowed(url, user_agent='YourCustomUserAgent/1.0', timeout=10):
# Parse URL and build the robots.txt URL.
parsed_url = urlparse(url)
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
rp = RobotFileParser() # Instantiate robots.txt parser.
try:
# Retrieve the robots.txt file.
response = requests.get(robots_url, headers={'User-Agent': user_agent}, timeout=timeout)
if response.status_code == 200:
rp.parse(response.text.splitlines()) # Parse robots.txt content.
return rp.can_fetch(user_agent, url) # Check if user agent is allowed.
else:
print(f"No robots.txt found at {robots_url}. Assuming scraping is allowed.")
return True # Assume allowed if robots.txt is missing.
except requests.exceptions.Timeout:
print(f"Timeout fetching robots.txt from {robots_url}. Assuming scraping is allowed.")
return True # Assume allowed on timeout.
except Exception as e:
print(f"Error fetching robots.txt from {robots_url}: {e}. Assuming scraping is allowed.")
return True # Assume allowed on any other error.
def fetch_url_content(url):
headers = {'User-Agent': 'YourCustomUserAgent/1.0'}
print(f"Starting fetch for {url}")
try:
# Send GET request to the URL.
print(f"Sending GET request to {url}")
response = requests.get(url, headers=headers, timeout=15)
print(f"Received response for {url} with status code {response.status_code}")
response.raise_for_status() # Raise exception for HTTP errors.
content = response.text
print(f"Fetched content for {url}, size: {len(content)} bytes")
max_content_size = 1 * 1024 * 1024 # Limit content size to 1 MB.
if len(content) > max_content_size:
print(f"Content too large for {url}, skipping.")
return None
try:
# Parse HTML content using lxml parser.
print(f"Parsing content for {url}")
soup = BeautifulSoup(content[:max_content_size], 'lxml')
print(f"Parsed content for {url}")
except Exception as e:
print(f"Error parsing content from {url}: {e}")
return None
# Extract page title.
title = soup.title.string.strip() if soup.title else ""
# Extract meta description.
meta_description_tag = soup.find('meta', {'name': 'description'})
meta_description = (
meta_description_tag['content'].strip()
if meta_description_tag and 'content' in meta_description_tag.attrs
else ""
)
# Extract main text content.
text_content = soup.get_text(separator=' ', strip=True)
# Extract text from header tags.
h_tags_text = [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])]
# Extract alternative text from images.
image_alt_text = [img.get('alt', '').strip() for img in soup.find_all('img')]
print(f"Finished processing {url}")
return {
'title': title,
'meta_description': meta_description,
'content': text_content,
'h_tags_text': h_tags_text,
'image_alt_text': image_alt_text
}
except Exception as e:
print(f"Error fetching content from {url}: {e}")
return None
def process_row(row):
# Extract keyword and URL from DataFrame row.
keyword = row['Keyword']
url = row['URL']
print(f"Processing keyword: {keyword} - URL: {url}")
# Check if scraping is allowed as per robots.txt.
if not is_scraping_allowed(url):
print(f"Scraping disallowed by robots.txt for {url}")
disallowed_urls.append(url) # Add URL to disallowed list.
return {
"keyword": keyword,
"url": url,
"title": None,
"meta_description": None,
"content": "",
"h_tags_text": [],
"image_alt_text": []
}
# Fetch and process URL content.
result = fetch_url_content(url)
return {
"keyword": keyword,
"url": url,
"title": result.get('title') if result else None,
"meta_description": result.get('meta_description') if result else None,
"content": result.get('content') if result else "",
"h_tags_text": result.get('h_tags_text') if result else [],
"image_alt_text": result.get('image_alt_text') if result else []
}
# Process each row from the DataFrame of keywords and URLs.
scraped_data = []
disallowed_urls = [] # Initialise list for URLs disallowed by robots.txt.
total_urls = len(keyword_urls_df)
for index, row in keyword_urls_df.iterrows():
data = process_row(row)
scraped_data.append(data)
print(f"{len(scraped_data)}/{total_urls}")
# Convert the scraped data into a DataFrame.
extracted_content_df = pd.DataFrame(scraped_data)
# Save disallowed URLs to a CSV file.
disallowed_urls_df = pd.DataFrame(disallowed_urls, columns=['URL'])
disallowed_urls_df.to_csv('disallowed_urls.csv', index=False)
extracted_content_df
Python block 10
extracted_content_df
Python block 11
extracted_content_df.to_csv('extracted_content_df_initial.csv', index = False)
Load initial scraped data
Python block 12
extracted_content_df = pd.read_csv('extracted_content_df_initial.csv')
Python block 13
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None, 'display.html.use_mathjax', False):
display(disallowed_urls_df)
Python block 14
extracted_content_df['content'] = extracted_content_df['content'].astype(str)
Python block 15
TRAINING_DATA = '3b'
# Use the new tokenizer behavior by setting legacy=False
tokenizer = T5Tokenizer.from_pretrained(
f't5-{TRAINING_DATA}',
model_max_length=512,
legacy=False
)
# Force a fresh download if needed by setting force_download=True
model = T5ForConditionalGeneration.from_pretrained(
f't5-{TRAINING_DATA}',
force_download=False # Set to False if you don't want to force a new download
)
Python block 16
summary_n = 0
def generate_summary_t5_single_article(article):
global summary_n
if not article:
return None
torch.cuda.empty_cache()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}", summary_n)
summary_n+=1
# Tokenize the input and generate a summary
inputs = tokenizer.encode("summarize: " + article, return_tensors="pt", max_length=1024, truncation=True).to(device)
# Set temperature to a lower value for more focused output, closer to the input text
temperature = 1
# Increase num_beams for more extensive search and to generate more diverse summaries
num_beams = 5
# Set min_length to force longer summaries
min_length = 0
summary_ids = model.generate(inputs, num_beams=num_beams, min_length=min_length, max_length=500, length_penalty=1, early_stopping=False, temperature=temperature)
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
torch.cuda.empty_cache()
return summary
Python block 17
extracted_content_df['summary'] = extracted_content_df.content.apply(generate_summary_t5_single_article)
Python block 18
extracted_content_df['summary'] = extracted_content_df['summary'].astype(str)
Python block 19
del tokenizer, model
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
Python block 20
model = SentenceTransformer('all-MPNet-base-v2', device='cuda')
kw_model = KeyBERT(model=model)
Python block 21
extract_unique_keywords_n = 1
def extract_unique_keywords(text):
global extract_unique_keywords_n
print(extract_unique_keywords_n)
extract_unique_keywords_n+=1
# 2. Extract keywords
keywords = kw_model.extract_keywords(
text,
keyphrase_ngram_range=(1, 15),
stop_words='english',
top_n=25,
use_maxsum=False,
nr_candidates=200
)
# 3. Convert keyword results into a DataFrame
df = pd.DataFrame(keywords, columns=['keywords', 'cosine_similarity'])
# 4. Split multi-word keyword strings into separate tokens
df['keywords'] = df['keywords'].apply(lambda x: x.split())
# 5. Sort by cosine similarity in descending order
df.sort_values(by='cosine_similarity', ascending=False, inplace=True)
# 6. Re-join the tokens in each row
df['keywords_tidied'] = df['keywords'].apply(lambda words: " ".join(words))
# 7. Combine all tidied keywords
combined_text = ' '.join(df['keywords_tidied'].astype(str))
# 8. Split the combined text into individual words
words = combined_text.split()
# 9. Remove duplicates while preserving order
seen = set()
unique_words = []
for word in words:
if word not in seen:
seen.add(word)
unique_words.append(word)
# 10. Join unique keywords into a single string
unique_text = ' '.join(unique_words)
return unique_text
Python block 22
extracted_content_df['keywords_keybert'] = extracted_content_df.content.apply(extract_unique_keywords)
Python block 23
del kw_model, model
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
Python block 24
extracted_content_df['word_count'] = extracted_content_df['content'].apply(lambda x: len(x.split()) if isinstance(x, str) else 0)
Python block 25
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None, 'display.html.use_mathjax', False):
display(extracted_content_df[['url', 'summary', 'keywords_keybert', 'word_count']])
Python block 26
# Define a list of non-webpage file extensions to exclude
excluded_extensions = ['.pdf', '.jpg', '.png', '.docx', '.zip']
# Function to check if a URL is a proper webpage
def is_proper_webpage(url):
parsed_url = urlparse(url)
for ext in excluded_extensions:
if parsed_url.path.lower().endswith(ext):
return False
return True
Python block 27
extracted_content_df = extracted_content_df[extracted_content_df['url'].apply(is_proper_webpage)].reset_index(drop=True)
Python block 28
class ExtractiveSummarizer:
def __init__(self):
# Load the model once, specifying 'cuda' for GPU acceleration
self.model = SentenceTransformer('paraphrase-MiniLM-L6-v2', device='cuda')
# Initialize counter
self.counter = 0
def summarize(self, text, n_sentences=50):
# Print current counter value
print(f"Processing call number: {self.counter}")
# Increment the counter for next function call
self.counter += 1
# Step 1: Sentence Tokenization
sentences = nltk.tokenize.sent_tokenize(text)
# Handle empty or too short texts
if len(sentences) == 0:
return ""
# Step 2: Sentence Embedding with reduced batch size
batch_size = 512 # Reduced batch size to prevent OOM error
sentence_embeddings = self.model.encode(
sentences,
batch_size=batch_size,
show_progress_bar=True
)
# Step 3: Document Embedding
document_embedding = np.mean(sentence_embeddings, axis=0)
# Step 4: Similarity Calculation
similarities = cosine_similarity(
[document_embedding], sentence_embeddings
).flatten()
# Step 5: Sentence Ranking
ranked_indices = similarities.argsort()[::-1]
ranked_sentences = [sentences[i] for i in ranked_indices]
# Step 6: Selection of Top N Sentences
summary = " ".join(ranked_sentences[:n_sentences]) # Join sentences into a single string
return summary
# Instantiate the summarizer
summarizer = ExtractiveSummarizer()
Python block 29
extracted_content_df['extractive_summary'] = extracted_content_df.content.apply(lambda x: summarizer.summarize(x, n_sentences=25))
Python block 30
del summarizer
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
Python block 31
class KeyPhraseTransformer:
def __init__(self, model_type: str = "t5", model_name: str = "snrspeaks/KeyPhraseTransformer"):
# Set device to GPU if available, else CPU.
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Load the appropriate model based on type.
if model_type == "t5":
self.model = T5ForConditionalGeneration.from_pretrained(model_name).to(self.device)
elif model_type == "mt5":
self.model = MT5ForConditionalGeneration.from_pretrained(model_name).to(self.device)
# Load the tokenizer.
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
@staticmethod
def split_into_paragraphs(doc: str, max_tokens_per_para: int = 128, tokenizer=None):
# Ensure input is a string and remove leading/trailing whitespace.
if not isinstance(doc, str):
doc = str(doc) if doc is not None else ""
# Split document into sentences.
sentences = sent_tokenize(doc.strip())
temp = ""
temp_list = []
final_list = []
# Accumulate sentences into paragraphs based on token count.
for i, sentence in enumerate(sentences):
temp += " " + sentence
wc_temp = len(tokenizer.tokenize(temp) if tokenizer else word_tokenize(temp))
if wc_temp < max_tokens_per_para:
temp_list.append(sentence)
if i == len(sentences) - 1:
final_list.append(" ".join(temp_list))
else:
final_list.append(" ".join(temp_list))
temp = sentence
temp_list = [sentence]
if i == len(sentences) - 1:
final_list.append(" ".join(temp_list))
# Return non-empty paragraphs.
return [para for para in final_list if len(para.strip()) != 0]
def process_outputs(self, outputs):
# Split output strings on delimiter and flatten the list.
temp = [output[0].split(" | ") for output in outputs]
flatten = [item for sublist in temp for item in sublist]
# Return unique phrases while preserving order.
return sorted(set(flatten), key=flatten.index)
def filter_outputs(self, key_phrases, text):
# Convert key phrases and text to lower case.
key_phrases = [elem.lower() for elem in key_phrases]
text = text.lower()
valid_phrases = []
invalid_phrases = []
# Tokenise phrases and check their validity against the text and known words.
for phrases in key_phrases:
for phrase in word_tokenize(phrases):
if (phrase in word_tokenize(text)) or (phrase in words.words()):
if phrases not in valid_phrases:
valid_phrases.append(phrases)
else:
invalid_phrases.append(phrases)
# Return valid phrases excluding any invalid ones.
return [elem for elem in valid_phrases if elem not in invalid_phrases]
@staticmethod
def predict(model, tokenizer, doc: str, device):
# Encode document into model input format.
input_ids = tokenizer.encode(doc, return_tensors="pt", add_special_tokens=True).to(device)
# Generate predictions with beam search and various constraints.
generated_ids = model.generate(
input_ids=input_ids,
num_beams=2,
max_length=512,
repetition_penalty=2.5,
length_penalty=1,
early_stopping=True,
top_p=0.95,
top_k=50,
num_return_sequences=1,
)
# Decode generated tokens into text.
preds = [tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=True) for g in generated_ids]
return preds
def get_key_phrases(self, text: str, text_block_size: int = 64):
results = []
# Split the input text into manageable paragraphs.
paras = self.split_into_paragraphs(doc=text, max_tokens_per_para=text_block_size, tokenizer=self.tokenizer)
# Generate predictions for each paragraph.
for para in paras:
results.append(self.predict(self.model, self.tokenizer, para, self.device))
# Process and filter predictions to extract key phrases.
key_phrases = self.filter_outputs(self.process_outputs(results), text)
return ' '.join(key_phrases)
Python block 32
model_type = "t5" # or "mt5" depending on your preference
model_name = "snrspeaks/KeyPhraseTransformer"
key_phrase_transformer = KeyPhraseTransformer(model_type=model_type, model_name=model_name)
Python block 33
kp_n = 1
kp_n = 1
def kp(text):
global kp_n
try:
print(kp_n)
kp_n += 1
return key_phrase_transformer.get_key_phrases(text, text_block_size=64)
except Exception as e:
print(f"Error during key phrase extraction: {e}")
return None
Python block 34
def extract_pos(text):
if text is None:
return ""
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt', quiet=True)
try:
nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
nltk.download('averaged_perceptron_tagger', quiet=True)
tokens = word_tokenize(text.lower())
tagged = pos_tag(tokens, tagset='universal')
# Desired POS tags
desired_tags = {
'NOUN',
#'VERB',
#'ADJ',
#'ADV'
}
# Filter tokens with desired POS tags
filtered_words = [
word for word, tag in tagged
if tag in desired_tags and word not in {'s', "'s", ",", ""}
]
# Clean and filter special characters
cleaned_words = [re.sub(r'[^\w\s]', '', word) for word in filtered_words if word.strip()]
return ' '.join(cleaned_words)
Python block 35
def remove_to_be(input_string):
to_be_forms = {"am", "is", "are", "was", "were", "be", "being", "been"}
words = input_string.split()
result = [word for word in words if word.lower() not in to_be_forms]
return ' '.join(result)
Python block 36
extracted_content_df['summary_keyphrases'] = extracted_content_df.summary.apply(kp)
Python block 37
extracted_content_df['summary_keyphrases_tidied'] = extracted_content_df.summary_keyphrases.apply(extract_pos)
Python block 38
extracted_content_df['summary_keyphrases_tidied'] = extracted_content_df.summary_keyphrases_tidied.apply(remove_to_be)
Python block 39
extracted_content_df['extractive_summary_keyphrases'] = extracted_content_df.extractive_summary.apply(kp)
Python block 40
extracted_content_df['extractive_summary_keyphrases_tidied'] = extracted_content_df.extractive_summary.apply(extract_pos)
Python block 41
extracted_content_df['meta_description'] = extracted_content_df['meta_description'].replace({pd.NA: "", np.nan: ""})
Python block 42
# Ensure all values in 'meta_description' are converted to strings before applying the function
extracted_content_df['meta_description_tidied'] = extracted_content_df.meta_description.astype(str).apply(extract_pos)
Python block 43
extracted_content_df['meta_description_keyphrases'] = extracted_content_df.meta_description.apply(kp)
Python block 44
extracted_content_df['meta_description_keyphrases'] = extracted_content_df.meta_description.apply(extract_pos)
Python block 45
def clean_text_list(text_list):
if isinstance(text_list, list):
# Filter out empty strings and whitespace-only strings
cleaned_list = [text.strip() for text in text_list if text.strip()]
# Join the filtered list and remove punctuation
return " ".join(cleaned_list).translate(str.maketrans('', '', string.punctuation))
return ""
Python block 46
extracted_content_df['h_tags_tidied'] = extracted_content_df['h_tags_text'].apply(clean_text_list)
Python block 47
extracted_content_df['h_tags_tidied'] = extracted_content_df['h_tags_tidied'].apply(extract_pos)
Python block 48
extracted_content_df['image_alt_text_tidied'] = extracted_content_df.image_alt_text.apply(clean_text_list)
Python block 49
extracted_content_df['image_alt_text_tidied'] = extracted_content_df.image_alt_text_tidied.apply(extract_pos)
Python block 50
extracted_content_df['meta_description_keyphrases_tidied'] = extracted_content_df.meta_description.apply(extract_pos)
Python block 51
extracted_content_df['meta_description_keyphrases_tidied'] = extracted_content_df.meta_description_keyphrases_tidied.apply(remove_to_be)
Python block 52
del key_phrase_transformer
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
Python block 53
extracted_content_df['tidied_content'] = extracted_content_df.content.apply(extract_pos)
Python block 54
def remove_duplicates(input_string):
words = input_string.split()
seen = set()
result = []
for word in words:
if word not in seen:
seen.add(word)
result.append(word)
return ' '.join(result)
Python block 55
extracted_content_df['tidied_content_remove_duplicates'] = extracted_content_df.tidied_content.apply(remove_duplicates)
Python block 56
extracted_content_df['tidied_summary'] = extracted_content_df.summary.apply(extract_pos)
Python block 57
extracted_content_df['tidied_summary'] = extracted_content_df.tidied_summary.apply(remove_to_be)
Python block 58
extracted_content_df.to_csv('extracted_content_df.csv', index = False)
Load data
Python block 59
extracted_content_df = pd.read_csv('extracted_content_df.csv')
Python block 60
extracted_content_df.word_count.sort_values(ascending=False).head(10)
Python block 61
import matplotlib.pyplot as plt
sorted_data = extracted_content_df.word_count.sort_values(ascending=False)
plt.figure(figsize=(12, 6))
sorted_data.plot(kind='bar', width = 5)
plt.xticks([])
plt.show()
Python block 62
def plot_most_common_words(df, column, n=20):
# Step 1: Join all text entries into a single string
all_text = ' '.join(df[column].dropna().astype(str).tolist())
# Step 2: Use regex to extract words, ignoring any isolated letters or symbols
words = re.findall(r'\b[a-zA-Z]{2,}\b', all_text.lower()) # Only match words with alphabetic characters and two or more letters
# Step 3: Count the occurrences of each word
word_counts = Counter(words)
# Step 4: Get the n most common words
most_common_words = word_counts.most_common(n)
# Step 5: Plot the bar graph
plt.figure(figsize=(25, 8))
plt.bar(range(len(most_common_words)), [count for word, count in most_common_words], align='center', color='crimson')
# Adjust x-ticks to align last letter of each word to the center of each bar
plt.xticks(
range(len(most_common_words)),
[word for word, count in most_common_words],
rotation=60,
fontsize=10,
ha='right' # Align labels to the right, so the last letter aligns with the bar center
)
plt.ylabel('Frequency')
plt.xlabel('Words')
plt.title('Most Common Words')
plt.show()
Python block 63
def create_network_plot(text_series, threshold_value, xval, yval):
# Tokenize the text data
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(text_series)
feature_names = vectorizer.get_feature_names_out()
# Create a list of lists where each inner list contains the words in a document
nodes_list = [list(set([feature_names[i] for i in X.getrow(doc_idx).indices])) for doc_idx in range(X.shape[0])]
def create_undirected_graph(nodes_list):
G = nx.Graph() # Create an empty undirected graph
for nodes in nodes_list:
for i in range(len(nodes)):
for j in range(i + 1, len(nodes)):
G.add_edge(nodes[i], nodes[j])
return G
G = create_undirected_graph(nodes_list)
G = nx.relabel_nodes(G, {n: n.replace('$', r'\$') for n in G.nodes()})
# Calculate centrality for all nodes in the graph
centrality = nx.degree_centrality(G)
plt.figure(figsize=(xval, yval))
degree_centrality = nx.degree_centrality(G)
threshold = np.percentile(list(degree_centrality.values()), threshold_value)
G.remove_nodes_from([node for node in G.nodes() if degree_centrality[node] < threshold])
new_node_sizes = [degree_centrality[node] * 50000 for node in G.nodes() if degree_centrality[node] >= threshold]
partition = community_louvain.best_partition(G, resolution=1)
colors = [partition[node] for node in G.nodes()]
nx.draw_networkx(G, node_size=new_node_sizes, node_color=colors, edge_color='#d3d3d3',
font_color='black', font_weight="heavy", font_size=10,
labels={node: node for node in G.nodes()}, cmap='rainbow')
plt.grid()
plt.show()
# Create a dictionary where the keys are the community labels and the values are the nodes in that community
community_dict = {}
for node, label in partition.items():
if label not in community_dict:
community_dict[label] = []
community_dict[label].append(node)
# Create a dataframe where each column represents a community
df = pd.DataFrame(columns=list(community_dict.keys()))
for label in community_dict:
nodes_counts = Counter(community_dict[label])
top_nodes = nodes_counts.most_common(20)
top_nodes = [x[0] for x in top_nodes]
top_nodes = top_nodes + [""] * (20 - len(top_nodes)) # Add empty strings to make the list length 20
df[label] = top_nodes
# Function to assign majority label to each document
def get_majority_label(doc_idx):
nodes = nodes_list[doc_idx]
labels = [partition[node] for node in nodes if node in partition]
return Counter(labels).most_common(1)[0][0] if labels else None
community_labels = [get_majority_label(i) for i in range(len(nodes_list))]
# Create DataFrame of original data with community labels
community_df = pd.DataFrame({'original_data': text_series.tolist(), 'community_label': community_labels})
community_df = community_df.dropna() # Remove rows with missing community labels, if necessary
community_df['community_label'] = community_df['community_label'].astype(int)
# community_df.to_csv('community_labels.csv', index=False)
# Create an empty DataFrame to store the results
community_freq_dfs = []
for label in community_df['community_label'].unique():
subset_df = community_df[community_df['community_label'] == label]
vectorizer = CountVectorizer()
word_counts = vectorizer.fit_transform(subset_df['original_data'])
words = vectorizer.get_feature_names_out()
word_frequencies = word_counts.sum(axis=0).tolist()[0]
# Extend word_freq_dict to include centrality
word_freq_dict = {
f'{label}_word': words,
f'{label}_frequency': word_frequencies,
f'{label}_centrality': [centrality.get(word, 0) for word in words] # Map centrality scores
}
word_freq_df = pd.DataFrame(word_freq_dict).sort_values(by=f'{label}_frequency', ascending=False)
word_freq_df.reset_index(drop=True, inplace=True)
community_freq_dfs.append(word_freq_df)
final_df = pd.concat(community_freq_dfs, axis=1)
# Assuming final_df is the concatenated DataFrame
for col in final_df.columns:
if 'frequency' in col: # Target only frequency columns
final_df[col] = final_df[col].fillna(0).astype(int)
pd.set_option('display.max_columns', None)
return final_df.head(20)
Python block 64
time.strftime('%Y-%m-%d %H:%M', time.localtime())
Select column
Python block 65
# Dropdown widget for selecting a column
column_selector = widgets.Dropdown(
options=extracted_content_df.columns.tolist(),
description='Select Column:',
disabled=False,
)
# Display the dropdown widget
display(column_selector)
# Button to confirm selection
confirm_button = widgets.Button(description="Assign Column")
# Object to store selected column data
selected_column_data = None
# Define a function that will run when the button is clicked
def assign_column_to_object(button):
global selected_column_data
# Select the column and drop NA/None values
selected_column_data = extracted_content_df[column_selector.value].dropna()
print(f"Assigned '{column_selector.value}' to selected_column_data (NA/None values removed).")
# Link button click to the function
confirm_button.on_click(assign_column_to_object)
# Display the button
display(confirm_button)
Word frequency
Python block 66
plot_most_common_words(extracted_content_df, selected_column_data.name, 100)
Network analysis
Python block 67
warnings.filterwarnings("ignore")
create_network_plot(selected_column_data, 0, 15, 15)
Apriori
Python block 68
def apriori_analysis(df, column, keyword, min_confidence, min_lift):
# Step 1: Subset the data to only rows containing the keyword
df_filtered = df[df[column].str.contains(keyword, na=False)]
# Step 2: Process the column to one-hot encoding format
transactions = df_filtered[column].str.split() # Split each row into a list of keywords
unique_keywords = set(item for sublist in transactions for item in sublist)
one_hot_df = pd.DataFrame([{word: (word in transaction) for word in unique_keywords} for transaction in transactions])
# Function to find minimum support within a timeout
temp_list = []
def find_min_support():
nonlocal temp_list # Ensure we're modifying the outer temp_list
lowest_support = 1
start_time_inner = time.time()
while True:
# Generate frequent itemsets with the specified support
frequent_itemsets = apriori(one_hot_df, min_support=lowest_support, use_colnames=True)
if not frequent_itemsets.empty:
temp_list.append(lowest_support)
# Decrease the support
if lowest_support > 0.01:
lowest_support -= 0.01
else:
lowest_support -= 0.001
if lowest_support <= 0:
lowest_support = 0.001
# Break if the inner loop runs for more than 0.5 seconds
if time.time() - start_time_inner > 0.5:
start_time_inner = time.time() # Reset inner loop timer
if time.time() - start_time >= 10:
break # Ensure total function doesn't exceed 10 seconds
# Run the find_min_support function with a 10-second timeout
start_time = time.time()
try:
func_timeout(10, find_min_support)
except FunctionTimedOut:
pass
# Determine the SUPPORT value based on collected supports
if len(temp_list) > 3:
SUPPORT = temp_list[-3]
elif len(temp_list) > 0:
SUPPORT = temp_list[-1]
else:
# No support value found within the time limit
raise ValueError("No support value found within the time limit.")
print("Support: ", round(SUPPORT, 4))
# Step 3: Generate frequent itemsets with the calculated SUPPORT
frequent_itemsets = apriori(one_hot_df, min_support=SUPPORT, use_colnames=True)
# Step 4: Generate association rules with specified confidence and filter by lift
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=min_confidence)
filtered_rules = rules[
(rules['lift'] >= min_lift) &
(rules['antecedents'].apply(lambda x: keyword in x) | rules['consequents'].apply(lambda x: keyword in x))
]
# Combine antecedents and consequents into a single column of associated words as a comma-separated string
filtered_rules = filtered_rules.copy() # Ensures we're working on a copy, not a view
filtered_rules['associated_words'] = filtered_rules.apply(
lambda row: ', '.join(sorted(set(row['antecedents']) | set(row['consequents']))), axis=1
)
print(str(len(filtered_rules)) + " rows")
# Return only the combined associated words and relevant metrics
return filtered_rules[['associated_words', 'support', 'confidence', 'lift']]
Python block 69
extracted_content_df.columns
Python block 70
extracted_content_df[['keywords_keybert']]
Python block 71
# Parameters for analysis
column = selected_column_data.name
keyword = 'pop'
min_confidence = 0.1
min_lift = 1.0
print(column)
# Run Apriori analysis and display results
results = apriori_analysis(extracted_content_df, column, keyword, min_confidence, min_lift)
results = results.sort_values(by=['lift', 'support'], ascending=False)
results.drop_duplicates(subset=['associated_words'], keep='first', inplace=True)
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None, 'display.html.use_mathjax', False):
display(results)
Python block 72
def subset_by_keywords(df, keywords):
keywords_set = set(keywords)
return df[df['associated_words'].map(lambda x: keywords_set <= set(x.split(', ')))]
Python block 73
keywords = ['fear', 'macabre']
filtered_results = subset_by_keywords(results, keywords)
with pd.option_context('display.max_rows', None, 'display.max_colwidth', None, 'display.html.use_mathjax', False):
display(filtered_results)
This version removes Jupyter interface chrome, notebook layout scaffolding, runtime scripts, and application-level CSS so the code is exposed as primary document content.