Lesson 3

Mastering Data Aggregation and Formatting in Python


Welcome to our lesson on mastering data aggregation and data streams with JSON formatting in Python. In this lesson, we'll start by building a basic sales records aggregator. Then, we'll extend its functionality to handle more complex operations such as filtering, data aggregation, and formatting. By the end of this session, you'll be able to manage and format data streams efficiently.

Starter Task Methods and Their Definitions

To begin, we'll implement a basic sales record aggregator. Here are the methods we'll be focusing on:

  • add_sale(self, sale_id: str, amount: float) -> None - Adds a sale record with a unique identifier sale_id and an amount. If a sale with the same sale_id already exists, it updates the amount.
  • get_sale(self, sale_id: str) -> float | None - Retrieves the sale amount associated with the sale_id. If the sale does not exist, it returns None.
  • delete_sale(self, sale_id: str) -> bool - Deletes the sale record with the given sale_id. Returns True if the sale was deleted and False if the sale does not exist.

Are these methods clear so far? Great! Let's now look at how we would implement them.

Starter Task Solution

Here is the complete code for the starter task:

1class SalesAggregator: 2 def __init__(self): 3 self.sales = {} 4 5 def add_sale(self, sale_id: str, amount: float) -> None: 6 self.sales[sale_id] = amount 7 8 def get_sale(self, sale_id: str) -> float | None: 9 return self.sales.get(sale_id) 10 11 def delete_sale(self, sale_id: str) -> bool: 12 if sale_id in self.sales: 13 del self.sales[sale_id] 14 return True 15 return False 16 17# Example Usage 18aggregator = SalesAggregator() 19 20# Add sales 21aggregator.add_sale('001', 100.50) 22aggregator.add_sale('002', 200.75) 23 24# Get sale 25print(aggregator.get_sale('001')) # Output: 100.5 26 27# Delete sale 28print(aggregator.delete_sale('002')) # Output: True 29print(aggregator.get_sale('002')) # Output: None


  • The __init__ method initializes an empty dictionary to store sales records.
  • The add_sale method adds a new sale or updates the amount for an existing sale ID.
  • The get_sale method retrieves the amount for a given sale ID or returns None if the sale does not exist.
  • The delete_sale method removes the sale record for the given sale ID or returns False if the sale does not exist.

Now that we have our basic aggregator, let's extend it to include more advanced functionalities.

New Methods and Their Definitions

To increase the complexity and usefulness of our sales aggregator, we'll introduce some new methods. These new methods will handle advanced data aggregation, filtering, and formatting functionalities involving JSON.

  • aggregate_sales(self, min_amount: float = 0) -> dict - Returns a dictionary with the total number of sales and the total amount of sales where the sale amount is above min_amount. The dictionary format looks like this:

    1{ 2 "total_sales": int, 3 "total_amount": float 4}
  • format_sales(self, min_amount: float = 0) -> str - Returns the sales data, filtered by min_amount, formatted as JSON. Includes aggregated sales statistics in the output.

  • add_sale(self, sale_id: str, amount: float, date: str) -> None - Adds or updates a sale record with a unique identifier sale_id, amount, and a date in the format "YYYY-MM-DD".

  • get_sales_in_date_range(self, start_date: str, end_date: str) -> list[dict] - Retrieves all sales that occurred within the given date range, inclusive. Each sale includes sale_id, amount, and date.

Let's implement these methods step-by-step.

Step 1: Enhancing the 'add_sale' Method to Include Date

We'll first modify the add_sale method to accept a date.

1def add_sale(self, sale_id: str, amount: float, date: str) -> None: 2 self.sales[sale_id] = {"amount": amount, "date": date}

This ensures that each sale record includes a date in addition to the amount.

Step 2: Implementing the 'aggregate_sales' Method

Now, we create the aggregate_sales method:

1def aggregate_sales(self, min_amount: float = 0) -> dict: 2 total_sales = 0 3 total_amount = 0.0 4 for sale in self.sales.values(): 5 if sale["amount"] > min_amount: 6 total_sales += 1 7 total_amount += sale["amount"] 8 return {"total_sales": total_sales, "total_amount": total_amount} 9 10# Create an instance of SalesAggregator 11aggregator = SalesAggregator() 12 13# Add sales with date 14aggregator.add_sale('001', 100.50, '2023-01-01') 15aggregator.add_sale('002', 200.75, '2023-01-15') 16 17# Aggregate sales 18print(aggregator.aggregate_sales(min_amount=50)) 19# Output: {'total_sales': 2, 'total_amount': 301.25}

This method iterates through the sales and sums up those that exceed the min_amount.

Step 3: Implementing the 'format_sales' Method

Next, we'll create the format_sales method to output data in JSON format.

1import json 2 3def format_sales(self, min_amount: float = 0) -> str: 4 filtered_sales = [sale for sale in self.sales.items() if sale[1]["amount"] > min_amount] 5 statistics = self.aggregate_sales(min_amount) 6 7 result = { 8 "sales": [{"sale_id": sale_id, "amount": sale["amount"], "date": sale["date"]} for sale_id, sale in filtered_sales], 9 "statistics": statistics 10 } 11 return json.dumps(result) 12 13# Format sales to JSON 14print(aggregator.format_sales(min_amount=50)) 15# Output: '{"sales":[{"sale_id":"001","amount":100.5,"date":"2023-01-01"},{"sale_id":"002","amount":200.75,"date":"2023-01-15"}],"statistics":{"total_sales":2,"total_amount":301.25}}'

This function formats the sales data as JSON and includes aggregated statistics.

Step 4: Implementing the 'get_sales_in_date_range' Method

Finally, let's implement the get_sales_in_date_range method:

1from datetime import datetime 2 3def get_sales_in_date_range(self, start_date: str, end_date: str) -> list[dict]: 4 start = datetime.strptime(start_date, "%Y-%m-%d") 5 end = datetime.strptime(end_date, "%Y-%m-%d") 6 return [{"sale_id": sale_id, "amount": sale["amount"], "date": sale["date"]} 7 for sale_id, sale in self.sales.items() 8 if start <= datetime.strptime(sale["date"], "%Y-%m-%d") <= end] 9 10# Get sales in date range 11print(aggregator.get_sales_in_date_range('2023-01-01', '2023-12-31')) 12# Output: [{'sale_id': '001', 'amount': 100.5, 'date': '2023-01-01'}, {'sale_id': '002', 'amount': 200.75, 'date': '2023-01-15'}]

This method retrieves all sales within the specified date range.

Lesson Summary

Congratulations! You've now extended a basic sales aggregator to an advanced one capable of filtering, aggregating, and formatting data in JSON. These skills are crucial for handling data streams efficiently, especially when dealing with large datasets. Feel free to experiment with similar challenges to reinforce your understanding. Well done, and see you in the next lesson!

Enjoy this lesson? Now it's time to practice with Cosmo!

Practice is how you turn knowledge into actual skills.