Statement of Completion#ec3738e2
Control Flow
medium
Cybersecurity Incident Response System: Building an Automated Threat Detection Framework
Resolution
Activities
Project.ipynb
Cybersecurity Incident Response System: Building an Automated Threat Detection Framework¶
1. Detect Failed Login Attempts with Pattern Matching¶
In [1]:
def detect_login_failures(logs):
"""
Analyze logs to detect IPs with multiple failed login attempts.
Args:
logs: List of log entry strings
Returns:
Dictionary mapping IPs to their number of failed login attempts (only IPs with 3+ failures)
"""
suspicious_ips = {}
for log in logs:
words = log.split()
match words:
# TODO: Add pattern for "Failed password for..." format
case [_, _, "Failed", "password", "for", _, "from", ip, *_]:
suspicious_ips[ip] = suspicious_ips.get(ip, 0) + 1
# TODO: Add pattern for "Authentication failure from..." format
case [_, _, "Authentication", "failure", "from", ip, *_]:
suspicious_ips[ip] = suspicious_ips.get(ip, 0) + 1
case _:
# Default case (log format not recognized)
pass
# Filter for IPs with 3 or more failures
return {ip: count for ip, count in suspicious_ips.items() if count >= 3}
In [3]:
logs = [
"2023-06-15 08:23:01 Failed password for admin from 192.168.1.5 port 22",
"2023-06-15 08:23:05 Failed password for admin from 192.168.1.5 port 22",
"2023-06-15 08:23:08 Failed password for admin from 192.168.1.5 port 22",
"2023-06-15 08:23:10 Failed password for root from 192.168.1.7 port 22",
"2023-06-15 08:23:12 Accepted password for user from 192.168.1.8 port 22",
"2023-06-15 08:24:01 Failed password for admin from 192.168.1.5 port 22",
"2023-06-15 08:25:03 Authentication failure from 192.168.1.7"
]
result = detect_login_failures(logs)
result
Out[3]:
{'192.168.1.5': 4}
2. Identify Multi-Stage Attack Patterns¶
In [ ]:
In [9]:
def detect_attack_sequence(logs):
"""
Detect multi-stage attack patterns in logs.
Args:
logs: List of log entry strings
Returns:
List of IP addresses that exhibit the suspicious pattern
"""
# Track the state of each IP address
ip_states = {} # Format: {ip: {'failed_logins': int, 'successful_login': bool, 'sensitive_access': bool}}
for log in logs:
words = log.split()
match words:
# TODO: Pattern for failed login attempts
case [_, _, "Failed", "password", "for", _, "from", ip, *_]:
ip_states.setdefault(ip, {'failed_logins': 0, 'successful_login': False, 'sensitive_access': False})
ip_states[ip]['failed_logins'] += 1
# TODO: Pattern for successful logins
case [_, _, "Accepted", "password", "for", _, "from", ip, *_]:
ip_states.setdefault(ip, {'failed_logins': 0, 'successful_login': False, 'sensitive_access': False})
if ip_states[ip]['failed_logins'] >= 2:
ip_states[ip]['successful_login'] = True
# TODO: Pattern for sensitive file access or privilege escalation
case [_, _, "access", "to", path, "by", _, "from", ip, *_] if "/etc/" in path:
ip_states.setdefault(ip, {'failed_logins': 0, 'successful_login': False, 'sensitive_access': False})
if ip_states[ip]['successful_login']:
ip_states[ip]['sensitive_access'] = True
case _:
# Unknown log format
pass
# TODO: Return IPs that have completed all stages of the attack pattern
suspicious_ips = [ip for ip, log in ip_states.items()
if (log['failed_logins'] >= 2 and log['successful_login'] and log['sensitive_access'])]
return suspicious_ips
In [10]:
logs = [
"2023-06-15 08:23:01 Failed password for admin from 192.168.1.5 port 22",
"2023-06-15 08:23:05 Failed password for admin from 192.168.1.5 port 22",
"2023-06-15 08:23:10 Accepted password for admin from 192.168.1.5 port 22",
"2023-06-15 08:23:15 access to /etc/shadow by admin from 192.168.1.5",
"2023-06-15 08:24:01 Failed password for root from 192.168.1.7 port 22",
"2023-06-15 08:24:05 Failed password for root from 192.168.1.7 port 22",
"2023-06-15 08:24:10 Accepted password for user from 192.168.1.7 port 22",
"2023-06-15 08:30:00 Failed password for admin from 192.168.1.9 port 22",
"2023-06-15 08:30:05 Accepted password for admin from 192.168.1.9 port 22"
]
result = detect_attack_sequence(logs)
result
Out[10]:
['192.168.1.5']
3. Which Python feature would be most appropriate for parsing different log formats in a security log analyzer that needs to extract specific fields based on varying patterns?¶
In [ ]:
4. Implement a Multi-Source Threat Scanner with Loop Else¶
In [15]:
def scan_security_feeds(feeds):
"""
Scan multiple security feeds for suspicious events.
Args:
feeds: List of lists, where each inner list contains event strings
Returns:
String containing either "ALERT: [event]" or "All systems normal"
"""
suspicious_keywords = ["unauthorized", "breach", "malware"]
# TODO: Implement nested loops to check each feed and each event
# TODO: Use a loop-else pattern to handle the case where no suspicious events are found
for feed in feeds:
for event in feed:
if any(keyword in event.lower() for keyword in suspicious_keywords):
return f"ALERT: {event}"
else:
return "All systems normal" # Replace with your implementation
In [16]:
feeds = [
["user login", "service started", "config updated", "user logout"],
["backup completed", "unauthorized access attempt", "service stopped"],
["system idle", "memory usage normal", "cpu usage normal"],
["user login", "malware detected", "file quarantined"]
]
# Test with feeds containing suspicious events
result1 = scan_security_feeds(feeds)
print(result1) # Should output: "ALERT: unauthorized access attempt"
# Test with feeds containing no suspicious events
safe_feeds = [
["user login", "service started", "config updated", "user logout"],
["backup completed", "service stopped"],
["system idle", "memory usage normal", "cpu usage normal"]
]
result2 = scan_security_feeds(safe_feeds)
print(result2) # Should output: "All systems normal"
ALERT: unauthorized access attempt All systems normal
5. In a security system that monitors multiple data sources, what is the primary advantage of using a custom iterator?¶
In [ ]:
6. Implement Exception Chaining for Security Alerts¶
In [18]:
class AuthenticationException(Exception):
"""Exception for authentication-related issues"""
pass
class BruteForceException(Exception):
"""Exception for brute force login attempts"""
pass
def detect_brute_force(username, failed_attempts):
"""
Check if a brute force attack is happening based on failed login attempts.
Args:
username: The username being used in the login attempts
failed_attempts: Number of consecutive failed login attempts
Raises:
BruteForceException: When 5 or more failed attempts are detected
(chained from an AuthenticationException)
"""
# TODO: Implement brute force detection with exception chaining
if failed_attempts >= 5:
auth_error = AuthenticationException(f"Authentication failed for user '{username}'")
raise BruteForceException(f"Possible brute force attack: {failed_attempts} failed attempts") from auth_error
In [19]:
try:
detect_brute_force("admin", 5)
print("No security issues detected")
except Exception as e:
print(f"Alert: {e}")
print(f"Exception type: {type(e).__name__}")
if e.__cause__:
print(f"Caused by: {e.__cause__}")
Alert: Possible brute force attack: 5 failed attempts Exception type: BruteForceException Caused by: Authentication failed for user 'admin'
7. When creating a hierarchy of security alert exceptions, which approach is most appropriate?¶
In [ ]:
8. Create a Memory-Efficient Network Traffic Analyzer¶
In [22]:
def analyze_network_packets(log_file):
"""
Process a network traffic log file and yield suspicious packets.
Args:
log_file: Path to the network traffic log file
Yields:
Dictionary containing details of each suspicious packet
"""
# TODO: Implement a generator function that:
# 1. Opens and reads the log file line by line
# 2. Parses each line into a packet dictionary
# 3. Yields suspicious packets based on the criteria
with open(log_file, 'r') as file:
for line in file:
# Skip empty lines or comments
if not line.strip() or line.strip().startswith('#'):
continue
# Parse the packet data from the log line
try:
# Assuming comma-separated format: source,destination,port,size,...
parts = line.strip().split(',')
packet = {
'source': parts[0],
'destination': parts[1],
'port': int(parts[2]),
'size': int(parts[3])
}
# Check if the packet matches our criteria for being suspicious
if 1 <= packet['port'] <= 1024 and packet['size'] > 1000:
yield packet
except (IndexError, ValueError) as e:
# Skip malformed lines
continue
In [23]:
def print_suspicious_packets(log_file, limit=5):
"""Print the first few suspicious packets from the log file."""
for i, packet in enumerate(analyze_network_packets(log_file)):
if i >= limit:
print(f"... and more (limiting output to {limit} packets)")
break
print(f"Suspicious packet: {packet}")
# Sample usage
print_suspicious_packets('data/network_traffic.txt')
Suspicious packet: {'source': '192.168.1.5', 'destination': '203.0.113.5', 'port': 22, 'size': 1420} Suspicious packet: {'source': '192.168.1.7', 'destination': '203.0.113.10', 'port': 80, 'size': 1820} Suspicious packet: {'source': '192.168.1.5', 'destination': '203.0.113.5', 'port': 22, 'size': 1550} Suspicious packet: {'source': '192.168.1.9', 'destination': '203.0.113.15', 'port': 25, 'size': 1200} Suspicious packet: {'source': '192.168.1.6', 'destination': '203.0.113.12', 'port': 21, 'size': 1350} ... and more (limiting output to 5 packets)
9. Which statement accurately describes the execution of a generator function in a network traffic analyzer?¶
In [ ]:
10: Create a Simple File Quarantine Manager¶
In [25]:
import os
import shutil
class SecurityException(Exception):
"""Security alert exception"""
pass
class FileQuarantine:
"""Context manager for safe file analysis"""
def __init__(self, file_path, quarantine_dir="quarantine"):
self.original_path = file_path
self.quarantine_dir = quarantine_dir
self.quarantined_path = None
def __enter__(self):
# Create quarantine directory if needed
if not os.path.exists(self.quarantine_dir):
os.makedirs(self.quarantine_dir)
# Copy file to quarantine
self.quarantined_path = os.path.join(
self.quarantine_dir,
os.path.basename(self.original_path)
)
shutil.copy2(self.original_path, self.quarantined_path)
return self.quarantined_path
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: Complete this method to remove the quarantined file
try:
if self.quarantined_path and os.path.exists(self.quarantined_path):
os.remove(self.quarantined_path)
except Exception as e:
print(f"Warning: Failed to delete quarantined file: {e}")
In [26]:
# Create a test file
with open("test_file.txt", "w") as f:
f.write("Test content")
# Simple analysis function
def analyze_file(path):
print(f"Analyzing file at: {path}")
return "Analysis complete"
# Test the context manager
with FileQuarantine("test_file.txt") as quarantine_path:
result = analyze_file(quarantine_path)
print(result)
# Check if cleanup worked
print(f"File still exists: {os.path.exists(quarantine_path)}")
Analyzing file at: quarantine/test_file.txt Analysis complete File still exists: False
11. When implementing resource management for a security quarantine system, what is the key benefit of defining your own context manager class rather than using contextlib.contextmanager?¶
In [ ]: