Lesson 2
Applying Data Filtering and Aggregation in User Data Management
Introduction

Welcome to today's lesson on applying data filtering and aggregation in a real-world scenario using a user management system. We'll start by building a foundational structure that can handle basic user operations. Then, we'll expand it by introducing more advanced functionalities that allow filtering and aggregating user data.

Starter Task Methods

In our starter task, we will implement a class that manages basic operations on a collection of user data, specifically handling adding new users, retrieving user profiles, and updating user profiles.

Here are the starter task methods:

  • add_user(self, user_id: str, age: int, country: str, subscribed: bool) -> bool - adds a new user with the specified attributes. Returns True if the user was added successfully and False if a user with the same user_id already exists.
  • get_user(self, user_id: str) -> dict[str, int | str | bool] | None - returns the user's profile as a dictionary if the user exists; otherwise, returns None.
  • update_user(self, user_id: str, age: int | None, country: str | None, subscribed: bool | None) -> bool - updates the user's profile based on non-None parameters. Returns True if the user exists and was updated, False otherwise.
Solution for the Starter Task

Here is the implementation of our starter task:

Python
1class UserManager: 2 def __init__(self): 3 self.users = {} 4 5 def add_user(self, user_id: str, age: int, country: str, subscribed: bool) -> bool: 6 if user_id in self.users: 7 return False 8 self.users[user_id] = {"age": age, "country": country, "subscribed": subscribed} 9 return True 10 11 def get_user(self, user_id: str) -> dict[str, int | str | bool] | None: 12 return self.users.get(user_id, None) 13 14 def update_user(self, user_id: str, age: int | None, country: str | None, subscribed: bool | None) -> bool: 15 if user_id not in self.users: 16 return False 17 if age is not None: 18 self.users[user_id]["age"] = age 19 if country is not None: 20 self.users[user_id]["country"] = country 21 if subscribed is not None: 22 self.users[user_id]["subscribed"] = subscribed 23 return True 24 25# Example usage 26um = UserManager() 27print(um.add_user("u1", 25, "USA", True)) # True 28print(um.add_user("u2", 30, "Canada", False)) # True 29print(um.add_user("u1", 22, "Mexico", True)) # False 30print(um.get_user("u1")) # {"age": 25, "country": "USA", "subscribed": True} 31print(um.update_user("u1", 26, None, None)) # True 32print(um.update_user("u3", 19, "UK", False)) # False

This implementation covers all our starter methods. Let's move forward and introduce more complex functionalities.

Introducing New Methods for Data Filtering and Aggregation

With our foundational structure in place, it's time to add functionalities for filtering user data and aggregating statistics.

Here are new methods to Implement:

  • filter_users(self, min_age: int | None, max_age: int | None, country: str | None, subscribed: bool | None) -> list[str]:
    • Returns the list of user IDs that match the specified criteria. Criteria can be None, meaning that criterion should not be applied during filtering.
  • aggregate_stats(self) -> dict[str, int | float] - returns statistics in the form of a dictionary:
    • total_users: Total number of users
    • average_age: Average age of all users (rounded down to the nearest integer)
    • subscribed_ratio: Ratio of subscribed users to total users (as a float with two decimals)
Step-by-Step Implementation. Step 1: Adding 'filter_users' Method

This method filters users based on the criteria provided. Let's see how it works:

Python
1class UserManager: 2 # Existing methods... 3 4 def filter_users(self, min_age: int | None, max_age: int | None, country: str | None, subscribed: bool | None) -> list[str]: 5 filtered_users = [] 6 for user_id, profile in self.users.items(): 7 if min_age is not None and profile['age'] < min_age: 8 continue 9 if max_age is not None and profile['age'] > max_age: 10 continue 11 if country is not None and profile['country'] != country: 12 continue 13 if subscribed is not None and profile['subscribed'] != subscribed: 14 continue 15 filtered_users.append(user_id) 16 return filtered_users 17 18# Example usage of the new method 19um = UserManager() 20um.add_user("u1", 25, "USA", True) 21um.add_user("u2", 30, "Canada", False) 22um.add_user("u3", 22, "USA", True) 23print(um.filter_users(20, 30, "USA", True)) # ["u1", "u3"] 24print(um.filter_users(None, 28, None, None)) # ["u1", "u3"] 25print(um.filter_users(None, None, "Canada", False)) # ["u2"]
  • The filter_users method filters users based on min_age, max_age, country, and subscribed status criteria.
  • It iterates over the users dictionary and checks each user's profile against the provided criteria.
  • Users who meet all the criteria are added to the filtered_users list, which is then returned.
  • The example usage shows how to add users and filter them based on different criteria.
Step 2: Adding 'aggregate_stats' Method

This method aggregates statistics from the user profiles. Let's implement it:

Python
1class UserManager: 2 # Existing methods... 3 4 def aggregate_stats(self) -> dict[str, int | float]: 5 total_users = len(self.users) 6 if total_users == 0: 7 return {"total_users": 0, "average_age": 0, "subscribed_ratio": 0.00} 8 9 total_age = sum(profile["age"] for profile in self.users.values()) 10 subscribed_users = sum(1 for profile in self.users.values() if profile["subscribed"]) 11 12 average_age = total_age // total_users 13 subscribed_ratio = round(subscribed_users / total_users, 2) 14 15 return {"total_users": total_users, "average_age": average_age, "subscribed_ratio": subscribed_ratio} 16 17# Using `um` from the previous section 18print(um.aggregate_stats()) # {"total_users": 3, "average_age": 25, "subscribed_ratio": 0.67}

This aggregate_stats method calculates and returns aggregate statistics about the users in the form of a dictionary. It first determines total_users, the total number of users. If there are no users, it returns a dictionary with zeroed statistics. Otherwise, it calculates total_age by summing the ages of all users and counts subscribed_users who are subscribed. It then computes average_age by performing integer division of total_age by total_users and calculates subscribed_ratio by dividing subscribed_users by total_users and rounding to two decimal places. The resulting statistics dictionary includes total_users, average_age, and subscribed_ratio.

The Final Solution

Here's the complete UserManager class with all methods, including the new ones for filtering and aggregation:

Python
1class UserManager: 2 def __init__(self): 3 self.users = {} 4 5 def add_user(self, user_id: str, age: int, country: str, subscribed: bool) -> bool: 6 if user_id in self.users: 7 return False 8 self.users[user_id] = {"age": age, "country": country, "subscribed": subscribed} 9 return True 10 11 def get_user(self, user_id: str) -> dict[str, int | str | bool] | None: 12 return self.users.get(user_id, None) 13 14 def update_user(self, user_id: str, age: int | None, country: str | None, subscribed: bool | None) -> bool: 15 if user_id not in our users: 16 return False 17 if age is not None: 18 self.users[user_id]["age"] = age 19 if country is not None: 20 self.users[user_id]["country"] = country 21 if subscribed is not None: 22 self.users[user_id]["subscribed"] = subscribed 23 return True 24 25 def filter_users(self, min_age: int | None, max_age: int | None, country: str | None, subscribed: bool | None) -> list[str]: 26 filtered_users = [] 27 for user_id, profile in our users.items(): 28 if min_age is not None and profile["age"] < min_age: 29 continue 30 if max_age is not None and profile["age"] > max_age: 31 continue 32 if country is not None and profile["country"] != country: 33 if country is not None and profile["country"] != country: 34 continue 35 if subscribed is not None and profile["subscribed"] != subscribed: 36 continue 37 filtered_users.append(user_id) 38 return filtered_users 39 40 def aggregate_stats(self) -> dict[str, int | float]: 41 total_users = len(our users) 42 if total_users == 0: 43 return {"total_users": 0, "average_age": 0, "subscribed_ratio": 0.00} 44 45 total_age = sum(profile["age"] for profile in our users.values()) 46 subscribed_users = sum(1 for profile in our users.values() if profile["subscribed"]) 47 48 average_age = total_age // total_users 49 subscribed_ratio = round(subscribed_users / total_users, 2) 50 51 return {"total_users": total_users, "average_age": average_age, "subscribed_ratio": subscribed_ratio} 52 53# Example usage 54um = UserManager() 55um.add_user("u1", 25, "USA", True) 56um.add_user("u2", 30, "Canada", False) 57um.add_user("u3", 22, "USA", True) 58 59print(um.filter_users(20, 30, "USA", True)) # ["u1", "u3"] 60print(um.filter_users(None, 28, None, None)) # ["u1", "u3"] 61print(um.filter_users(None, None, "Canada", False)) # ["u2"] 62 63print(um.aggregate_stats()) # {"total_users": 3, "average_age": 25, "subscribed_ratio": 0.67}
Lesson Summary

Great job! Today, you've learned how to effectively handle user data by implementing advanced functionalities like filtering and aggregation on top of a basic system. This is a critical skill in real-life software development, where you often need to extend existing systems to meet new requirements.

I encourage you to practice solving similar challenges to solidify your understanding of data filtering and aggregation. Happy coding, and see you in the next lesson!

Enjoy this lesson? Now it's time to practice with Cosmo!
Practice is how you turn knowledge into actual skills.