Welcome to today's lesson on applying data filtering and aggregation in a real-world scenario using a user management system. We'll start by building a foundational structure that can handle basic user operations. Then, we'll expand it by introducing more advanced functionalities that allow filtering and aggregating user data.
In our starter task, we will implement a class that manages basic operations on a collection of user data, specifically handling adding new users, retrieving user profiles, and updating user profiles.
Here are the starter task methods:
add_user(self, user_id: str, age: int, country: str, subscribed: bool) -> bool
- adds a new user with the specified attributes. Returns True
if the user was added successfully and False
if a user with the same user_id
already exists.get_user(self, user_id: str) -> dict[str, int | str | bool] | None
- returns the user's profile as a dictionary if the user exists; otherwise, returns None
.update_user(self, user_id: str, age: int | None, country: str | None, subscribed: bool | None) -> bool
- updates the user's profile based on non-None parameters. Returns True
if the user exists and was updated, False
otherwise.Here is the implementation of our starter task:
Python1class UserManager: 2 def __init__(self): 3 self.users = {} 4 5 def add_user(self, user_id: str, age: int, country: str, subscribed: bool) -> bool: 6 if user_id in self.users: 7 return False 8 self.users[user_id] = {"age": age, "country": country, "subscribed": subscribed} 9 return True 10 11 def get_user(self, user_id: str) -> dict[str, int | str | bool] | None: 12 return self.users.get(user_id, None) 13 14 def update_user(self, user_id: str, age: int | None, country: str | None, subscribed: bool | None) -> bool: 15 if user_id not in self.users: 16 return False 17 if age is not None: 18 self.users[user_id]["age"] = age 19 if country is not None: 20 self.users[user_id]["country"] = country 21 if subscribed is not None: 22 self.users[user_id]["subscribed"] = subscribed 23 return True 24 25# Example usage 26um = UserManager() 27print(um.add_user("u1", 25, "USA", True)) # True 28print(um.add_user("u2", 30, "Canada", False)) # True 29print(um.add_user("u1", 22, "Mexico", True)) # False 30print(um.get_user("u1")) # {"age": 25, "country": "USA", "subscribed": True} 31print(um.update_user("u1", 26, None, None)) # True 32print(um.update_user("u3", 19, "UK", False)) # False
This implementation covers all our starter methods. Let's move forward and introduce more complex functionalities.
With our foundational structure in place, it's time to add functionalities for filtering user data and aggregating statistics.
Here are new methods to Implement:
filter_users(self, min_age: int | None, max_age: int | None, country: str | None, subscribed: bool | None) -> list[str]
:
None
, meaning that criterion should not be applied during filtering.aggregate_stats(self) -> dict[str, int | float]
- returns statistics in the form of a dictionary:
total_users
: Total number of usersaverage_age
: Average age of all users (rounded down to the nearest integer)subscribed_ratio
: Ratio of subscribed users to total users (as a float with two decimals)This method filters users based on the criteria provided. Let's see how it works:
Python1class UserManager: 2 # Existing methods... 3 4 def filter_users(self, min_age: int | None, max_age: int | None, country: str | None, subscribed: bool | None) -> list[str]: 5 filtered_users = [] 6 for user_id, profile in self.users.items(): 7 if min_age is not None and profile['age'] < min_age: 8 continue 9 if max_age is not None and profile['age'] > max_age: 10 continue 11 if country is not None and profile['country'] != country: 12 continue 13 if subscribed is not None and profile['subscribed'] != subscribed: 14 continue 15 filtered_users.append(user_id) 16 return filtered_users 17 18# Example usage of the new method 19um = UserManager() 20um.add_user("u1", 25, "USA", True) 21um.add_user("u2", 30, "Canada", False) 22um.add_user("u3", 22, "USA", True) 23print(um.filter_users(20, 30, "USA", True)) # ["u1", "u3"] 24print(um.filter_users(None, 28, None, None)) # ["u1", "u3"] 25print(um.filter_users(None, None, "Canada", False)) # ["u2"]
filter_users
method filters users based on min_age
, max_age
, country
, and subscribed
status criteria.users
dictionary and checks each user's profile against the provided criteria.filtered_users
list, which is then returned.This method aggregates statistics from the user profiles. Let's implement it:
Python1class UserManager: 2 # Existing methods... 3 4 def aggregate_stats(self) -> dict[str, int | float]: 5 total_users = len(self.users) 6 if total_users == 0: 7 return {"total_users": 0, "average_age": 0, "subscribed_ratio": 0.00} 8 9 total_age = sum(profile["age"] for profile in self.users.values()) 10 subscribed_users = sum(1 for profile in self.users.values() if profile["subscribed"]) 11 12 average_age = total_age // total_users 13 subscribed_ratio = round(subscribed_users / total_users, 2) 14 15 return {"total_users": total_users, "average_age": average_age, "subscribed_ratio": subscribed_ratio} 16 17# Using `um` from the previous section 18print(um.aggregate_stats()) # {"total_users": 3, "average_age": 25, "subscribed_ratio": 0.67}
This aggregate_stats
method calculates and returns aggregate statistics about the users in the form of a dictionary. It first determines total_users
, the total number of users. If there are no users, it returns a dictionary with zeroed statistics. Otherwise, it calculates total_age
by summing the ages of all users and counts subscribed_users
who are subscribed. It then computes average_age
by performing integer division of total_age
by total_users
and calculates subscribed_ratio
by dividing subscribed_users
by total_users
and rounding to two decimal places. The resulting statistics dictionary includes total_users
, average_age
, and subscribed_ratio
.
Here's the complete UserManager
class with all methods, including the new ones for filtering and aggregation:
Python1class UserManager: 2 def __init__(self): 3 self.users = {} 4 5 def add_user(self, user_id: str, age: int, country: str, subscribed: bool) -> bool: 6 if user_id in self.users: 7 return False 8 self.users[user_id] = {"age": age, "country": country, "subscribed": subscribed} 9 return True 10 11 def get_user(self, user_id: str) -> dict[str, int | str | bool] | None: 12 return self.users.get(user_id, None) 13 14 def update_user(self, user_id: str, age: int | None, country: str | None, subscribed: bool | None) -> bool: 15 if user_id not in our users: 16 return False 17 if age is not None: 18 self.users[user_id]["age"] = age 19 if country is not None: 20 self.users[user_id]["country"] = country 21 if subscribed is not None: 22 self.users[user_id]["subscribed"] = subscribed 23 return True 24 25 def filter_users(self, min_age: int | None, max_age: int | None, country: str | None, subscribed: bool | None) -> list[str]: 26 filtered_users = [] 27 for user_id, profile in our users.items(): 28 if min_age is not None and profile["age"] < min_age: 29 continue 30 if max_age is not None and profile["age"] > max_age: 31 continue 32 if country is not None and profile["country"] != country: 33 if country is not None and profile["country"] != country: 34 continue 35 if subscribed is not None and profile["subscribed"] != subscribed: 36 continue 37 filtered_users.append(user_id) 38 return filtered_users 39 40 def aggregate_stats(self) -> dict[str, int | float]: 41 total_users = len(our users) 42 if total_users == 0: 43 return {"total_users": 0, "average_age": 0, "subscribed_ratio": 0.00} 44 45 total_age = sum(profile["age"] for profile in our users.values()) 46 subscribed_users = sum(1 for profile in our users.values() if profile["subscribed"]) 47 48 average_age = total_age // total_users 49 subscribed_ratio = round(subscribed_users / total_users, 2) 50 51 return {"total_users": total_users, "average_age": average_age, "subscribed_ratio": subscribed_ratio} 52 53# Example usage 54um = UserManager() 55um.add_user("u1", 25, "USA", True) 56um.add_user("u2", 30, "Canada", False) 57um.add_user("u3", 22, "USA", True) 58 59print(um.filter_users(20, 30, "USA", True)) # ["u1", "u3"] 60print(um.filter_users(None, 28, None, None)) # ["u1", "u3"] 61print(um.filter_users(None, None, "Canada", False)) # ["u2"] 62 63print(um.aggregate_stats()) # {"total_users": 3, "average_age": 25, "subscribed_ratio": 0.67}
Great job! Today, you've learned how to effectively handle user data by implementing advanced functionalities like filtering and aggregation on top of a basic system. This is a critical skill in real-life software development, where you often need to extend existing systems to meet new requirements.
I encourage you to practice solving similar challenges to solidify your understanding of data filtering and aggregation. Happy coding, and see you in the next lesson!