Natural Language Processing
Lab Assignment
R.BhanuKiran
22BCE9560
L45+L46
1. Implement Relation Extraction from a corpus
2. import nltk
3. import pandas as pd
4. from nltk.tokenize import word_tokenize, sent_tokenize
5.
6. # Download necessary resources
7. nltk.download('punkt', quiet=True)
8. nltk.download('averaged_perceptron_tagger', quiet=True)
9.
10. def extract_relations(text):
11. """Extract subject-relation-object (SRO) triples from a given
text."""
12. relations = []
13.
14. for sentence in sent_tokenize(text):
15. tokens = word_tokenize(sentence)
16. pos_tags = nltk.pos_tag(tokens)
17.
18. subject, relation, obj = None, None, None
19.
20. for i, (token, tag) in enumerate(pos_tags):
21. if tag.startswith('NN') and not subject:
22. subject = token # First noun is assumed as the
subject
23. elif tag.startswith('VB') and subject and not
relation:
24. relation = token # First verb is the relation
25. elif tag.startswith('NN') and subject and relation:
26. obj = token # Next noun after the verb is the
object
27. relations.append({'sentence': sentence,
'subject': subject, 'relation': relation, 'object': obj})
28. subject, relation, obj = None, None, None #
Reset for potential next triple
29.
30. return pd.DataFrame(relations, columns=['sentence',
'subject', 'relation', 'object'])
31.
32. corpus = [
33. "Elon Musk founded SpaceX.",
34. "NASA launched the Artemis mission.",
35. "OpenAI developed ChatGPT.",
36. "Apple designs innovative products."
37. ]
38.
39. results = pd.concat([extract_relations(text) for text in corpus],
ignore_index=True)
40.
41. if not results.empty:
42. print(results.to_string(index=False))
43. else:
44. print("No valid subject-relation-object triples found.")
45.
Output:
2. Implement Event Extraction from the same corpus
import nltk
import pandas as pd
import re
from nltk.tokenize import word_tokenize, sent_tokenize
nltk.download('punkt', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
nltk.download('maxent_ne_chunker', quiet=True)
nltk.download('words', quiet=True)
nltk.download('maxent_ne_chunker_tab')
def extract_events(text):
"""Extract events with participants, time, and location."""
events = []
for sentence in sent_tokenize(text):
tokens = word_tokenize(sentence)
pos_tags = nltk.pos_tag(tokens)
named_entities = nltk.ne_chunk(pos_tags)
time = extract_time(sentence)
entities = {
label: ' '.join(w for w, _ in chunk.leaves())
for chunk in named_entities if hasattr(chunk, 'label')
for label in ['PERSON', 'ORGANIZATION', 'GPE']
}
for word, tag in pos_tags:
if tag.startswith('VB') and tag not in ['VBG', 'VBN']:
events.append({
'sentence': sentence,
'event': word,
'category': categorize_event(word),
'participants': entities.get('PERSON', '') or
entities.get('ORGANIZATION', ''),
'time': time,
'location': entities.get('GPE', '')
})
return pd.DataFrame(events, columns=['sentence', 'event',
'category', 'participants', 'time', 'location'])
def extract_time(text):
"""Extract time-related expressions."""
patterns = [
r'\b\d{4}\b',
r'\b(yesterday|today|tomorrow)\b',
r'\b(last|next|this)\s+\w+\b'
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group()
return None
def categorize_event(verb):
"""Classify events based on verb meaning."""
event_categories = {
"Movement": {"go", "travel", "arrive", "leave"},
"Communication": {"say", "announce", "report", "declare"},
"Transaction": {"buy", "sell", "purchase", "acquire"},
"Creation": {"make", "build", "develop", "invent"}
}
for category, verbs in event_categories.items():
if verb.lower() in verbs:
return category
return "Other"
corpus = [
"OpenAI developed a new language model last year.",
"NASA announced a Mars mission yesterday.",
"Tesla will launch a self-driving update next month in
California.",
"Microsoft acquired a gaming company in 2020.",
]
results = pd.concat([extract_events(text) for text in corpus],
ignore_index=True)
if not results.empty:
print(results.to_string(index=False))
else:
print("No significant events extracted.")
Output:
3. Design Rule based, Dictionary-based, and Machine Learning based
NER tagger
import nltk
import pandas as pd
import numpy as np
import re
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
nltk.download('punkt', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
nltk.download('maxent_ne_chunker', quiet=True)
nltk.download('words', quiet=True)
corpus = [
"Apple Inc. was founded by Steve Jobs in California in 1976.",
"Microsoft developed Windows operating system in the United
States.",
"Amazon CEO Jeff Bezos visited their new headquarters in Seattle
last week.",
"Tesla's Elon Musk announced a new factory in Berlin, Germany.",
"The European Union and the United Kingdom signed a trade deal in
December 2020.",
"Dr. Smith prescribed medication for John's condition at Mayo
Clinic."
]
def rule_based_ner(text):
"""Extract named entities using regex-based rules."""
entities = []
patterns = {
r'\b([A-Z][a-zA-Z]+(?: [A-Z][a-zA-Z]+)*) (Inc\.|Corp\.|Ltd\.|
LLC|Company)\b': "ORG",
r'\b(Mr\.|Mrs\.|Ms\.|Dr\.|Prof\.) ([A-Z][a-zA-Z]+(?: [A-Z][a-
zA-Z]+)*)\b': "PERSON",
r'\b(January|February|March|April|May|June|July|August|
September|October|November|December) \d{1,2}(st|nd|rd|th)?, \d{4}\b':
"DATE"
}
for pattern, label in patterns.items():
matches = re.finditer(pattern, text)
for match in matches:
entities.append({"text": match.group(), "label": label,
"method": "rule-based"})
return entities
class DictionaryNER:
"""Named Entity Recognition using a pre-defined dictionary."""
def __init__(self):
self.dictionary = {
"apple": "ORG", "microsoft": "ORG", "amazon": "ORG",
"tesla": "ORG",
"steve jobs": "PERSON", "jeff bezos": "PERSON", "elon
musk": "PERSON",
"california": "LOC", "seattle": "LOC", "berlin": "LOC",
"germany": "LOC"
}
def find_entities(self, text):
"""Find named entities using dictionary-based matching."""
entities = []
text_lower = text.lower()
for entity, label in self.dictionary.items():
if entity in text_lower:
start = text_lower.find(entity)
end = start + len(entity)
entities.append({"text": text[start:end], "label":
label, "method": "dictionary-based"})
return entities
class MLBasedNER:
"""NER using Machine Learning with Logistic Regression."""
def __init__(self):
self.vectorizer = CountVectorizer(analyzer='char_wb',
ngram_range=(2, 5))
self.model = LogisticRegression(max_iter=1000)
self.is_trained = False
def prepare_training_data(self, texts):
"""Prepare tokenized data with POS tagging and Named Entity
Labels."""
X, y = [], []
for text in texts:
tokens = word_tokenize(text)
pos_tags = nltk.pos_tag(tokens)
named_entities = nltk.ne_chunk(pos_tags)
for token, pos in pos_tags:
X.append(token)
label = "O"
for ne in named_entities:
if hasattr(ne, 'label') and isinstance(ne,
nltk.tree.Tree):
for word, tag in ne.leaves():
if word == token:
label = ne.label()
break
y.append(label)
return X, y
def train(self, texts):
"""Train the ML-based NER model."""
X, y = self.prepare_training_data(texts)
if len(set(y)) < 2:
print("Not enough data to train ML model.")
return
X_features = self.vectorizer.fit_transform(X)
self.model.fit(X_features, y)
self.is_trained = True
def predict(self, text):
"""Predict named entities in new text using the trained
model."""
if not self.is_trained:
return []
tokens = word_tokenize(text)
X_test = self.vectorizer.transform(tokens)
predicted_labels = self.model.predict(X_test)
entities = [
{"text": token, "label": label, "method": "ml-based"}
for token, label in zip(tokens, predicted_labels) if
label != 'O'
]
return entities
dictionary_ner = DictionaryNER()
ml_ner = MLBasedNER()
ml_ner.train(corpus)
for sentence in corpus:
print("Sentence:", sentence)
print("Rule-based NER:", rule_based_ner(sentence))
print("Dictionary-based NER:",
dictionary_ner.find_entities(sentence))
print("ML-based NER:", ml_ner.predict(sentence))
print("-" * 80)
Output:
4. Implement Custom-NER tagger for Week-8 related programs and
display the output
Code-I
import nltk
import networkx as nx
import matplotlib.pyplot as plt
from nltk.corpus import brown
from collections import defaultdict
from nltk.tokenize import word_tokenize
from nltk.tag import pos_tag
from nltk.corpus import wordnet
from nltk.chunk import ne_chunk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
nltk.download('brown')
nltk.download('maxent_ne_chunker')
nltk.download('words')
def build_dependency_graph():
edges = [
("root", "Book", 12), ("root", "that", 4), ("root", "flight",
4),
("Book", "that", 5), ("that", "Book", 6), ("that", "flight",
8),
("flight", "that", 7), ("flight", "Book", 5), ("Book",
"flight", 7),
("root", "John", 9), ("root", "saw", 10), ("root", "Mary", 9),
("John", "saw", 20), ("saw", "John", 30), ("saw", "Mary", 30),
("Mary", "saw", 0), ("John", "Mary", 3), ("Mary", "John", 11)
]
graph = nx.DiGraph()
graph.add_weighted_edges_from(edges)
plt.figure(figsize=(10, 6))
pos = nx.spring_layout(graph)
nx.draw(graph, pos, with_labels=True, node_color='lightblue',
edge_color='gray', node_size=2000, font_size=10)
edge_labels = {(u, v): d for u, v, d in edges}
nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels,
font_size=8)
plt.title("Dependency Graph using CLE Algorithm")
plt.show()
return graph
def extract_sensitive_words():
sensitive_categories = {
"personal": ["name", "address", "phone", "email", "gender",
"age"],
"financial": ["salary", "income", "credit", "loan", "bank"],
"social": ["friends", "family", "community", "relationship"]
}
brown_sample = brown.words()[:5000]
detected_words = defaultdict(list)
for word in brown_sample:
for category, keywords in sensitive_categories.items():
if word.lower() in keywords:
detected_words[category].append(word)
return detected_words
def map_sensitivity_scores(detected_words):
score_mapping = {"personal": 5, "financial": 4, "social": 3}
scored_words = []
for category, words in detected_words.items():
for word in words:
scored_words.append((word, category,
score_mapping[category]))
return scored_words
def retrieve_similar_words(detected_words):
synonyms = defaultdict(set)
for category, words in detected_words.items():
for word in words:
synsets = wordnet.synsets(word)
for syn in synsets:
for lemma in syn.lemmas():
synonyms[category].add(lemma.name().replace('_', '
'))
return synonyms
def named_entity_recognition(text):
tokens = word_tokenize(text)
pos_tags = pos_tag(tokens)
named_entities = ne_chunk(pos_tags)
persons, organizations, locations = set(), set(), set()
for chunk in named_entities:
if hasattr(chunk, "label"):
entity_name = " ".join(c[0] for c in chunk)
if chunk.label() == "PERSON":
persons.add(entity_name)
elif chunk.label() == "ORGANIZATION":
organizations.add(entity_name)
elif chunk.label() == "GPE":
locations.add(entity_name)
return {"PERSON": persons, "ORGANIZATION": organizations,
"LOCATION": locations}
dependency_graph = build_dependency_graph()
sensitive_words = extract_sensitive_words()
scored_words = map_sensitivity_scores(sensitive_words)
similar_sensitive_words = retrieve_similar_words(sensitive_words)
sample_text = "John works at Google and lives in New York."
ner_results = named_entity_recognition(sample_text)
print("Detected Sensitive Words:")
for word, category, score in scored_words:
print(f"{word} - {category} (Score: {score})")
print("\nWords Similar to Sensitive Words:")
for category, words in similar_sensitive_words.items():
print(f"{category}: {', '.join(words)}")
print("\nNamed Entities Identified:")
for entity_type, entities in ner_results.items():
print(f"{entity_type}: {', '.join(entities)}")
Code-II
import nltk
import pandas as pd
from nltk.corpus import gutenberg
from sklearn.feature_extraction.text import CountVectorizer
from gensim.models import Word2Vec
# Download necessary NLTK data
nltk.download("gutenberg")
nltk.download("punkt")
def extract_sensitive_words(text, file_name):
sensitive_categories = {
"personal": ["name", "age", "address", "gender", "identity"],
"financial": ["bank", "credit", "debt", "loan", "salary"],
"social": ["friend", "family", "community", "social",
"relationship"]
}
words = set(nltk.word_tokenize(text.lower()))
extracted_words = [(word, category, file_name) for word in words
for category, word_list in sensitive_categories.items() if word in
word_list]
return extracted_words
def process_corpus():
corpus_files = ['bible-kjv.txt', 'shakespeare-hamlet.txt']
extracted_data = []
for file in corpus_files:
corpus_text = gutenberg.raw(file)
extracted_data.extend(extract_sensitive_words(corpus_text,
file))
return extracted_data
def assign_sensitivity_score(extracted_data):
scores = {"personal": 5, "financial": 4, "social": 3}
return [(word, category, file_name, scores[category]) for word,
category, file_name in extracted_data]
def find_similar_words(corpus, target_words):
sentences = [nltk.word_tokenize(sent.lower()) for sent in
nltk.sent_tokenize(corpus)]
if len(sentences) < 2:
print("Not enough sentences for Word2Vec training.")
return {}
model = Word2Vec(sentences, vector_size=100, window=5, min_count=1,
workers=4)
similar_words = {}
for word in target_words:
if word in model.wv.index_to_key:
similar_words[word] = model.wv.most_similar(word, topn=5)
return similar_words
if __name__ == "__main__":
extracted_sensitive_words = process_corpus()
sensitivity_scores =
assign_sensitivity_score(extracted_sensitive_words)
target_words = list(set(word for word, _, _, _ in
sensitivity_scores))
bible_corpus = gutenberg.raw('bible-kjv.txt')
similar_words = find_similar_words(bible_corpus, target_words)
df_sensitive_words = pd.DataFrame(sensitivity_scores,
columns=["Word", "Category", "File Name", "Sensitivity Score"])
df_sensitive_words.drop_duplicates(inplace=True)
print("Extracted Sensitive Words:")
print(df_sensitive_words.head(20).to_markdown(index=False))
print("\nSimilar Words:")
for word, similar_list in similar_words.items():
print(f"{word}: {[sim_word for sim_word, _ in similar_list]}")
Output: