Mkdir700's Note

Mkdir700's Note

常见设计模式 Python 实现

24
2025-03-14

1.单例模式(Singleton Pattern)

题目描述:
实现一个数据库连接池管理器,确保系统中只有一个连接池实例,无论创建多少次都返回相同的实例。连接池应该支持获取连接和释放连接的功能。

要求:
1.实现一个 DatabaseConnectionPool 类,使用单例模式确保全局唯一
2.实现 get_connection()release_connection(connection) 方法
3.连接池应有最大连接数限制
4.提供一个测试函数,验证从不同地方获取的连接池是同一个实例

代码框架:

class DatabaseConnectionPool:
    # 请实现单例模式
    
    def __init__(self, max_connections=10):
        # 初始化连接池
        pass
    
    def get_connection(self):
        # 获取一个数据库连接
        pass
    
    def release_connection(self, connection):
        # 释放一个数据库连接回到池中
        pass

# 测试代码
def test_singleton_connection_pool():
    # 验证单例模式是否正确实现
    pass

参考答案:

class DatabaseConnectionPool:
    _instance = None
    
    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(DatabaseConnectionPool, cls).__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self, max_connections=10):
        # 只初始化一次
        if not self._initialized:
            self.max_connections = max_connections
            self.connections = []
            self.available_connections = []
            self.in_use_connections = set()
            self._initialized = True
            
            # 创建初始连接
            for _ in range(max_connections):
                connection = self._create_connection()
                self.connections.append(connection)
                self.available_connections.append(connection)
    
    def _create_connection(self):
        # 模拟创建数据库连接
        # 在实际应用中,这里会创建真正的数据库连接
        return {"id": id({}), "connected": True}
    
    def get_connection(self):
        if not self.available_connections:
            if len(self.connections) < self.max_connections:
                # 如果还没达到最大连接数,可以创建新连接
                connection = self._create_connection()
                self.connections.append(connection)
                self.available_connections.append(connection)
            else:
                raise Exception("No available connections in the pool")
        
        connection = self.available_connections.pop(0)
        self.in_use_connections.add(connection["id"])
        return connection
    
    def release_connection(self, connection):
        if connection["id"] in self.in_use_connections:
            self.in_use_connections.remove(connection["id"])
            self.available_connections.append(connection)
        else:
            raise Exception("Connection not in use or not from this pool")
    
    def status(self):
        return {
            "total": len(self.connections),
            "available": len(self.available_connections),
            "in_use": len(self.in_use_connections)
        }

# 测试代码
def test_singleton_connection_pool():
    # 创建两个连接池实例
    pool1 = DatabaseConnectionPool(max_connections=5)
    pool2 = DatabaseConnectionPool(max_connections=10)  # 这个参数会被忽略,因为已经初始化过了
    
    # 验证它们是同一个实例
    print(f"Is pool1 the same instance as pool2? {pool1 is pool2}")
    print(f"Pool max connections: {pool1.max_connections}")  # 应该是5,而不是10
    
    # 测试连接获取和释放
    conn1 = pool1.get_connection()
    conn2 = pool1.get_connection()
    print(f"Pool status after getting 2 connections: {pool1.status()}")
    
    pool1.release_connection(conn1)
    print(f"Pool status after releasing 1 connection: {pool1.status()}")
    
    # 验证从pool2获取的连接实际上是从同一个池中获取的
    conn3 = pool2.get_connection()
    print(f"Pool status after getting another connection from pool2: {pool1.status()}")
    
    # 清理
    pool2.release_connection(conn2)
    pool2.release_connection(conn3)
    print(f"Final pool status: {pool1.status()}")

if __name__ == "__main__":
    test_singleton_connection_pool()

2.工厂模式(Factory Pattern)

题目描述:
设计一个支持多种数据格式解析的文件处理系统。系统需要能够处理 CSV、JSON、XML 和 YAML 格式的文件,并将它们转换为统一的内部数据结构。

要求:
1.设计一个 FileParser 抽象基类,定义解析文件的接口
2.为每种文件格式实现具体的解析器类
3.实现一个 ParserFactory 工厂类,根据文件扩展名创建对应的解析器
4.编写测试代码,验证工厂能够正确创建不同类型的解析器

代码框架:

from abc import ABC, abstractmethod

class FileParser(ABC):
    @abstractmethod
    def parse(self, file_path):
        """解析文件并返回数据"""
        pass

class CSVParser(FileParser):
    # 实现CSV解析
    pass

class JSONParser(FileParser):
    # 实现JSON解析
    pass

class XMLParser(FileParser):
    # 实现XML解析
    pass

class YAMLParser(FileParser):
    # 实现YAML解析
    pass

class ParserFactory:
    # 实现工厂方法,根据文件扩展名创建对应的解析器
    pass

# 测试代码
def test_parser_factory():
    # 测试工厂是否能正确创建不同类型的解析器
    pass

参考答案:

from abc import ABC, abstractmethod
import csv
import json
import xml.etree.ElementTree as ET
import yaml
import os

class FileParser(ABC):
    @abstractmethod
    def parse(self, file_path):
        """解析文件并返回数据"""
        pass
    
    def validate_file(self, file_path):
        """验证文件是否存在"""
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")

class CSVParser(FileParser):
    def parse(self, file_path):
        self.validate_file(file_path)
        data = []
        with open(file_path, 'r', newline='') as file:
            csv_reader = csv.DictReader(file)
            for row in csv_reader:
                data.append(dict(row))
        return data

class JSONParser(FileParser):
    def parse(self, file_path):
        self.validate_file(file_path)
        with open(file_path, 'r') as file:
            return json.load(file)

class XMLParser(FileParser):
    def parse(self, file_path):
        self.validate_file(file_path)
        tree = ET.parse(file_path)
        root = tree.getroot()
        
        def element_to_dict(element):
            result = {}
            for child in element:
                if len(child) == 0:
                    result[child.tag] = child.text
                else:
                    result[child.tag] = element_to_dict(child)
            return result
        
        return element_to_dict(root)

class YAMLParser(FileParser):
    def parse(self, file_path):
        self.validate_file(file_path)
        with open(file_path, 'r') as file:
            return yaml.safe_load(file)

class ParserFactory:
    @staticmethod
    def create_parser(file_path):
        """根据文件扩展名创建对应的解析器"""
        _, extension = os.path.splitext(file_path)
        extension = extension.lower()
        
        if extension == '.csv':
            return CSVParser()
        elif extension == '.json':
            return JSONParser()
        elif extension == '.xml':
            return XMLParser()
        elif extension in ['.yaml', '.yml']:
            return YAMLParser()
        else:
            raise ValueError(f"Unsupported file extension: {extension}")

# 测试代码
def test_parser_factory():
    # 创建测试文件
    # CSV测试文件
    with open('test.csv', 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['name', 'age'])
        writer.writerow(['Alice', '30'])
        writer.writerow(['Bob', '25'])
    
    # JSON测试文件
    with open('test.json', 'w') as file:
        json.dump([{'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25}], file)
    
    # XML测试文件
    with open('test.xml', 'w') as file:
        file.write('<data><person><name>Alice</name><age>30</age></person><person><name>Bob</name><age>25</age></person></data>')
    
    # YAML测试文件
    with open('test.yaml', 'w') as file:
        file.write("""
- name: Alice
  age: 30
- name: Bob
  age: 25
""")
    
    # 测试工厂创建解析器
    try:
        # 测试CSV解析器
        csv_parser = ParserFactory.create_parser('test.csv')
        assert isinstance(csv_parser, CSVParser)
        csv_data = csv_parser.parse('test.csv')
        print(f"CSV Parser result: {csv_data}")
        
        # 测试JSON解析器
        json_parser = ParserFactory.create_parser('test.json')
        assert isinstance(json_parser, JSONParser)
        json_data = json_parser.parse('test.json')
        print(f"JSON Parser result: {json_data}")
        
        # 测试XML解析器
        xml_parser = ParserFactory.create_parser('test.xml')
        assert isinstance(xml_parser, XMLParser)
        xml_data = xml_parser.parse('test.xml')
        print(f"XML Parser result: {xml_data}")
        
        # 测试YAML解析器
        yaml_parser = ParserFactory.create_parser('test.yaml')
        assert isinstance(yaml_parser, YAMLParser)
        yaml_data = yaml_parser.parse('test.yaml')
        print(f"YAML Parser result: {yaml_data}")
        
        # 测试不支持的文件类型
        try:
            ParserFactory.create_parser('test.txt')
            assert False, "Should raise ValueError for unsupported file extension"
        except ValueError:
            print("Successfully caught ValueError for unsupported file extension")
        
        print("All tests passed!")
    finally:
        # 清理测试文件
        for file_name in ['test.csv', 'test.json', 'test.xml', 'test.yaml']:
            if os.path.exists(file_name):
                os.remove(file_name)

if __name__ == "__main__":
    test_parser_factory()

3.观察者模式(Observer Pattern)

题目描述:
实现一个股票市场监控系统,当股票价格变化时通知注册的观察者。系统需要支持多种类型的观察者,如邮件通知、短信通知和应用内通知。

要求:
1.设计一个 Subject 接口,定义注册、移除和通知观察者的方法
2.实现一个 StockMarket 类作为具体主题,管理股票价格并在价格变化时通知观察者
3.设计一个 Observer 接口,定义更新方法
4.实现至少三种不同的观察者:EmailNotifierSMSNotifierAppNotifier
5.编写测试代码,验证当股票价格变化时,所有注册的观察者都能收到通知

代码框架:

from abc import ABC, abstractmethod

class Observer(ABC):
    @abstractmethod
    def update(self, stock_name, price):
        """接收股票价格更新的通知"""
        pass

class Subject(ABC):
    @abstractmethod
    def register_observer(self, observer):
        """注册观察者"""
        pass
    
    @abstractmethod
    def remove_observer(self, observer):
        """移除观察者"""
        pass
    
    @abstractmethod
    def notify_observers(self):
        """通知所有观察者"""
        pass

class StockMarket(Subject):
    # 实现股票市场类,管理股票价格并通知观察者
    pass

class EmailNotifier(Observer):
    # 实现邮件通知观察者
    pass

class SMSNotifier(Observer):
    # 实现短信通知观察者
    pass

class AppNotifier(Observer):
    # 实现应用内通知观察者
    pass

# 测试代码
def test_stock_market_observers():
    # 测试股票价格变化时是否正确通知所有观察者
    pass

参考答案:

from abc import ABC, abstractmethod

class Observer(ABC):
    @abstractmethod
    def update(self, stock_name, price):
        """接收股票价格更新的通知"""
        pass

class Subject(ABC):
    @abstractmethod
    def register_observer(self, observer):
        """注册观察者"""
        pass
    
    @abstractmethod
    def remove_observer(self, observer):
        """移除观察者"""
        pass
    
    @abstractmethod
    def notify_observers(self):
        """通知所有观察者"""
        pass

class StockMarket(Subject):
    def __init__(self):
        self.observers = []
        self.stocks = {}  # 股票名称 -> 价格
    
    def register_observer(self, observer):
        if observer not in self.observers:
            self.observers.append(observer)
    
    def remove_observer(self, observer):
        if observer in self.observers:
            self.observers.remove(observer)
    
    def notify_observers(self):
        for stock_name, price in self.stocks.items():
            for observer in self.observers:
                observer.update(stock_name, price)
    
    def set_stock_price(self, stock_name, price):
        """设置股票价格,如果价格变化则通知观察者"""
        old_price = self.stocks.get(stock_name)
        if old_price != price:
            self.stocks[stock_name] = price
            print(f"Stock price updated: {stock_name} = {price}")
            self.notify_observers()
    
    def get_stock_price(self, stock_name):
        """获取股票价格"""
        return self.stocks.get(stock_name)

class EmailNotifier(Observer):
    def __init__(self, email_address):
        self.email_address = email_address
    
    def update(self, stock_name, price):
        # 在实际应用中,这里会发送真正的邮件
        print(f"Sending email to {self.email_address}: {stock_name} stock price updated to {price}")

class SMSNotifier(Observer):
    def __init__(self, phone_number):
        self.phone_number = phone_number
    
    def update(self, stock_name, price):
        # 在实际应用中,这里会发送真正的短信
        print(f"Sending SMS to {self.phone_number}: {stock_name} stock price updated to {price}")

class AppNotifier(Observer):
    def __init__(self, user_id):
        self.user_id = user_id
    
    def update(self, stock_name, price):
        # 在实际应用中,这里会发送应用内推送
        print(f"Sending in-app notification to user {self.user_id}: {stock_name} stock price updated to {price}")

# 测试代码
def test_stock_market_observers():
    # 创建股票市场
    market = StockMarket()
    
    # 创建观察者
    email_notifier = EmailNotifier("user@example.com")
    sms_notifier = SMSNotifier("+1234567890")
    app_notifier = AppNotifier("user123")
    
    # 注册观察者
    market.register_observer(email_notifier)
    market.register_observer(sms_notifier)
    market.register_observer(app_notifier)
    
    # 设置股票价格,应该触发通知
    print("\nSetting initial stock prices:")
    market.set_stock_price("AAPL", 150.0)
    market.set_stock_price("GOOGL", 2800.0)
    
    # 更新股票价格,应该触发通知
    print("\nUpdating stock prices:")
    market.set_stock_price("AAPL", 152.5)
    
    # 移除一个观察者
    print("\nRemoving SMS notifier:")
    market.remove_observer(sms_notifier)
    
    # 再次更新股票价格,只有剩余的观察者应该收到通知
    print("\nUpdating stock prices after removing an observer:")
    market.set_stock_price("GOOGL", 2850.0)
    
    # 设置相同的价格,不应该触发通知
    print("\nSetting the same price(should not trigger notifications):")
    market.set_stock_price("AAPL", 152.5)

if __name__ == "__main__":
    test_stock_market_observers()

4.策略模式(Strategy Pattern)

题目描述:
设计一个文本处理系统,支持多种不同的文本压缩算法。系统应该能够在运行时动态选择不同的压缩策略,如 Gzip、Bzip2、LZMA 等。

要求:
1.设计一个 CompressionStrategy 接口,定义压缩和解压方法
2.实现至少三种不同的压缩策略:GzipCompressionBzip2CompressionLZMACompression
3.实现一个 TextProcessor 类,使用策略模式处理文本压缩
4.编写测试代码,验证系统能够使用不同的压缩策略处理文本

代码框架:

from abc import ABC, abstractmethod

class CompressionStrategy(ABC):
    @abstractmethod
    def compress(self, data):
        """压缩数据"""
        pass
    
    @abstractmethod
    def decompress(self, data):
        """解压数据"""
        pass

class GzipCompression(CompressionStrategy):
    # 实现Gzip压缩策略
    pass

class Bzip2Compression(CompressionStrategy):
    # 实现Bzip2压缩策略
    pass

class LZMACompression(CompressionStrategy):
    # 实现LZMA压缩策略
    pass

class TextProcessor:
    # 实现文本处理器,使用策略模式处理文本压缩
    pass

# 测试代码
def test_compression_strategies():
    # 测试不同的压缩策略
    pass

参考答案:

from abc import ABC, abstractmethod
import gzip
import bz2
import lzma
import os

class CompressionStrategy(ABC):
    @abstractmethod
    def compress(self, data):
        """压缩数据"""
        pass
    
    @abstractmethod
    def decompress(self, data):
        """解压数据"""
        pass

class GzipCompression(CompressionStrategy):
    def compress(self, data):
        if isinstance(data, str):
            data = data.encode('utf-8')
        return gzip.compress(data)
    
    def decompress(self, data):
        decompressed_data = gzip.decompress(data)
        try:
            # 尝试将字节转换为字符串
            return decompressed_data.decode('utf-8')
        except UnicodeDecodeError:
            # 如果无法解码,则返回原始字节
            return decompressed_data

class Bzip2Compression(CompressionStrategy):
    def compress(self, data):
        if isinstance(data, str):
            data = data.encode('utf-8')
        return bz2.compress(data)
    
    def decompress(self, data):
        decompressed_data = bz2.decompress(data)
        try:
            return decompressed_data.decode('utf-8')
        except UnicodeDecodeError:
            return decompressed_data

class LZMACompression(CompressionStrategy):
    def compress(self, data):
        if isinstance(data, str):
            data = data.encode('utf-8')
        return lzma.compress(data)
    
    def decompress(self, data):
        decompressed_data = lzma.decompress(data)
        try:
            return decompressed_data.decode('utf-8')
        except UnicodeDecodeError:
            return decompressed_data

class TextProcessor:
    def __init__(self, compression_strategy=None):
        self.compression_strategy = compression_strategy
    
    def set_compression_strategy(self, compression_strategy):
        """设置压缩策略"""
        self.compression_strategy = compression_strategy
    
    def compress_text(self, text):
        """使用当前策略压缩文本"""
        if not self.compression_strategy:
            raise ValueError("Compression strategy not set")
        return self.compression_strategy.compress(text)
    
    def decompress_text(self, compressed_data):
        """使用当前策略解压文本"""
        if not self.compression_strategy:
            raise ValueError("Compression strategy not set")
        return self.compression_strategy.decompress(compressed_data)
    
    def compress_file(self, input_file_path, output_file_path):
        """压缩文件"""
        if not self.compression_strategy:
            raise ValueError("Compression strategy not set")
        
        with open(input_file_path, 'rb') as input_file:
            data = input_file.read()
        
        compressed_data = self.compression_strategy.compress(data)
        
        with open(output_file_path, 'wb') as output_file:
            output_file.write(compressed_data)
    
    def decompress_file(self, input_file_path, output_file_path):
        """解压文件"""
        if not self.compression_strategy:
            raise ValueError("Compression strategy not set")
        
        with open(input_file_path, 'rb') as input_file:
            compressed_data = input_file.read()
        
        decompressed_data = self.compression_strategy.decompress(compressed_data)
        
        # 根据解压结果类型决定写入模式
        mode = 'w' if isinstance(decompressed_data, str) else 'wb'
        with open(output_file_path, mode) as output_file:
            output_file.write(decompressed_data)

# 测试代码
def test_compression_strategies():
    # 创建测试文本
    test_text = "This is a test text that will be compressed using different strategies." * 100
    
    # 创建测试文件
    with open('test_input.txt', 'w') as file:
        file.write(test_text)
    
    # 创建压缩策略
    gzip_strategy = GzipCompression()
    bzip2_strategy = Bzip2Compression()
    lzma_strategy = LZMACompression()
    
    # 创建文本处理器
    processor = TextProcessor()
    
    # 测试Gzip压缩
    processor.set_compression_strategy(gzip_strategy)
    gzip_compressed = processor.compress_text(test_text)
    gzip_decompressed = processor.decompress_text(gzip_compressed)
    processor.compress_file('test_input.txt', 'test_gzip.gz')
    processor.decompress_file('test_gzip.gz', 'test_gzip_decompressed.txt')
    
    # 测试 Bzip2 压缩
    processor.set_compression_strategy(bzip2_strategy)
    bzip2_compressed = processor.compress_text(test_text)
    bzip2_decompressed = processor.decompress_text(bzip2_compressed)
    processor.compress_file('test_input.txt', 'test_bzip2.bz2')
    processor.decompress_file('test_bzip2.bz2', 'test_bzip2_decompressed.txt')
    
    # 测试 LZMA 压缩
    processor.set_compression_strategy(lzma_strategy)
    lzma_compressed = processor.compress_text(test_text)
    lzma_decompressed = processor.decompress_text(lzma_compressed)
    processor.compress_file('test_input.txt', 'test_lzma.xz')
    processor.decompress_file('test_lzma.xz', 'test_lzma_decompressed.txt')
    
    # 打印压缩结果
    print(f"Original text size: {len(test_text)} bytes")
    print(f"Gzip compressed size: {len(gzip_compressed)} bytes")
    print(f"Bzip2 compressed size: {len(bzip2_compressed)} bytes")
    print(f"LZMA compressed size: {len(lzma_compressed)} bytes")
    
    # 验证解压结果
    assert gzip_decompressed == test_text
    assert bzip2_decompressed == test_text
    assert lzma_decompressed == test_text
    print("All decompression tests passed!")
    
    # 验证文件压缩和解压
    with open('test_gzip_decompressed.txt', 'r') as file:
        assert file.read() == test_text
    with open('test_bzip2_decompressed.txt', 'r') as file:
        assert file.read() == test_text
    with open('test_lzma_decompressed.txt', 'r') as file:
        assert file.read() == test_text
    print("All file compression/decompression tests passed!")
    
    # 清理测试文件
    for file_name in ['test_input.txt', 'test_gzip.gz', 'test_gzip_decompressed.txt',
                      'test_bzip2.bz2', 'test_bzip2_decompressed.txt',
                      'test_lzma.xz', 'test_lzma_decompressed.txt']:
        if os.path.exists(file_name):
            os.remove(file_name)

if __name__ == "__main__":
    test_compression_strategies()

5.装饰器模式(Decorator Pattern)

题目描述:
设计一个咖啡订购系统,允许顾客选择基础咖啡类型(如美式咖啡、拿铁等),然后添加各种配料(如牛奶、糖、巧克力等)。系统需要能够动态计算最终价格和描述。

要求:
1.设计一个Coffee抽象基类,定义获取价格和描述的方法
2.实现几种基础咖啡类型,如EspressoAmericanoLatte
3.设计一个CoffeeDecorator抽象装饰器类
4.实现几种配料装饰器,如MilkDecoratorSugarDecoratorChocolateDecorator
5.编写测试代码,验证系统能够正确计算添加不同配料的咖啡价格和描述

代码框架:

from abc import ABC, abstractmethod

class Coffee(ABC):
    @abstractmethod
    def get_cost(self):
        """获取咖啡价格"""
        pass
    
    @abstractmethod
    def get_description(self):
        """获取咖啡描述"""
        pass

class Espresso(Coffee):
    # 实现浓缩咖啡
    pass

class Americano(Coffee):
    # 实现美式咖啡
    pass

class Latte(Coffee):
    # 实现拿铁咖啡
    pass

class CoffeeDecorator(Coffee):
    # 实现咖啡装饰器抽象类
    pass

class MilkDecorator(CoffeeDecorator):
    # 实现牛奶装饰器
    pass

class SugarDecorator(CoffeeDecorator):
    # 实现糖装饰器
    pass

class ChocolateDecorator(CoffeeDecorator):
    # 实现巧克力

参考答案:

from abc import ABC, abstractmethod

class Coffee(ABC):
    @abstractmethod
    def get_cost(self):
        """获取咖啡价格"""
        pass
    
    @abstractmethod
    def get_description(self):
        """获取咖啡描述"""
        pass

class Espresso(Coffee):
    def get_cost(self):
        return 1.99
    
    def get_description(self):
        return "Espresso"

class Americano(Coffee):
    def get_cost(self):
        return 2.49
    
    def get_description(self):
        return "Americano"

class Latte(Coffee):
    def get_cost(self):
        return 2.99
    
    def get_description(self):
        return "Latte"

class CoffeeDecorator(Coffee):
    def __init__(self, coffee):
        self.coffee = coffee
    
    def get_cost(self):
        return self.coffee.get_cost()
    
    def get_description(self):
        return self.coffee.get_description()

class MilkDecorator(CoffeeDecorator):
    def get_cost(self):
        return self.coffee.get_cost() + 0.50
    
    def get_description(self):
        return f"{self.coffee.get_description()}, Milk"

class SugarDecorator(CoffeeDecorator):
    def get_cost(self):
        return self.coffee.get_cost() + 0.25
    
    def get_description(self):
        return f"{self.coffee.get_description()}, Sugar"

class ChocolateDecorator(CoffeeDecorator):
    def get_cost(self):
        return self.coffee.get_cost() + 0.75
    
    def get_description(self):
        return f"{self.coffee.get_description()}, Chocolate"

class WhippedCreamDecorator(CoffeeDecorator):
    def get_cost(self):
        return self.coffee.get_cost() + 0.60
    
    def get_description(self):
        return f"{self.coffee.get_description()}, Whipped Cream"

# 测试代码
def test_coffee_decorators():
    # 创建基础咖啡
    espresso = Espresso()
    americano = Americano()
    latte = Latte()
    
    # 测试基础咖啡
    print(f"{espresso.get_description()}: ${espresso.get_cost():.2f}")
    print(f"{americano.get_description()}: ${americano.get_cost():.2f}")
    print(f"{latte.get_description()}: ${latte.get_cost():.2f}")
    
    # 添加配料
    espresso_with_milk = MilkDecorator(espresso)
    print(f"{espresso_with_milk.get_description()}: ${espresso_with_milk.get_cost():.2f}")
    
    americano_with_sugar = SugarDecorator(americano)
    print(f"{americano_with_sugar.get_description()}: ${americano_with_sugar.get_cost():.2f}")
    
    # 多重装饰
    special_latte = ChocolateDecorator(WhippedCreamDecorator(MilkDecorator(latte)))
    print(f"{special_latte.get_description()}: ${special_latte.get_cost():.2f}")
    
    # 创建自定义咖啡
    custom_coffee = SugarDecorator(MilkDecorator(ChocolateDecorator(Espresso())))
    print(f"Custom coffee: {custom_coffee.get_description()}: ${custom_coffee.get_cost():.2f}")

if __name__ == "__main__":
    test_coffee_decorators()

6.适配器模式(Adapter Pattern)

题目描述:
你正在开发一个支持多种支付方式的电子商务系统。系统已经有一个标准的支付接口,但现在需要集成一个第三方支付 API,其接口与系统现有接口不兼容。请使用适配器模式解决这个问题。

要求:
1.系统有一个现有的 PaymentProcessor 接口,定义了 process_payment 方法
2.已经实现了几种支付处理器,如 CreditCardProcessorPayPalProcessor
3.第三方支付 API 有一个 ThirdPartyPaymentAPI 类,提供了 authorize_paymentcapture_payment 方法
4.实现一个适配器 ThirdPartyPaymentAdapter,使第三方 API 符合系统的支付接口
5.编写测试代码,验证适配器能够正确工作

代码框架:

from abc import ABC, abstractmethod

class PaymentProcessor(ABC):
    @abstractmethod
    def process_payment(self, amount):
        """处理支付"""
        pass

class CreditCardProcessor(PaymentProcessor):
    # 实现信用卡支付处理器
    pass

class PayPalProcessor(PaymentProcessor):
    # 实现PayPal支付处理器
    pass

class ThirdPartyPaymentAPI:
    # 第三方支付API,接口与系统不兼容
    def authorize_payment(self, amount):
        # 授权支付
        pass
    
    def capture_payment(self, authorization_id):
        # 捕获支付
        pass

class ThirdPartyPaymentAdapter(PaymentProcessor):
    # 实现适配器,使第三方API符合系统接口
    pass

# 测试代码
def test_payment_adapter():
    # 测试适配器是否能正确处理支付
    pass

参考答案:

from abc import ABC, abstractmethod
import uuid

class PaymentProcessor(ABC):
    @abstractmethod
    def process_payment(self, amount):
        """处理支付"""
        pass

class CreditCardProcessor(PaymentProcessor):
    def __init__(self, card_number, expiry_date, cvv):
        self.card_number = card_number
        self.expiry_date = expiry_date
        self.cvv = cvv
    
    def process_payment(self, amount):
        # 在实际应用中,这里会调用信用卡处理API
        print(f"Processing credit card payment of ${amount:.2f}")
        print(f"Card details: {self.card_number[-4:]}(expires: {self.expiry_date})")
        return {"success": True, "transaction_id": str(uuid.uuid4())}

class PayPalProcessor(PaymentProcessor):
    def __init__(self, email):
        self.email = email
    
    def process_payment(self, amount):
        # 在实际应用中,这里会调用PayPal API
        print(f"Processing PayPal payment of ${amount:.2f}")
        print(f"PayPal account: {self.email}")
        return {"success": True, "transaction_id": str(uuid.uuid4())}

class ThirdPartyPaymentAPI:
    def __init__(self, api_key):
        self.api_key = api_key
    
    def authorize_payment(self, amount):
        # 在实际应用中,这里会调用第三方API进行授权
        print(f"Third-party API: Authorizing payment of ${amount:.2f}")
        print(f"Using API key: {self.api_key[:5]}...")
        # 返回授权ID
        return {"authorization_id": str(uuid.uuid4()), "status": "authorized"}
    
    def capture_payment(self, authorization_id):
        # 在实际应用中,这里会调用第三方API捕获已授权的支付
        print(f"Third-party API: Capturing payment with authorization ID: {authorization_id}")
        # 返回交易ID
        return {"transaction_id": str(uuid.uuid4()), "status": "completed"}

class ThirdPartyPaymentAdapter(PaymentProcessor):
    def __init__(self, third_party_api):
        self.third_party_api = third_party_api
    
    def process_payment(self, amount):
        # 使用适配器模式将第三方API的两步流程适配为系统的单步流程
        # 1.首先授权支付
        auth_result = self.third_party_api.authorize_payment(amount)
        
        if auth_result["status"] != "authorized":
            return {"success": False, "error": "Payment authorization failed"}
        
        # 2.然后捕获支付
        capture_result = self.third_party_api.capture_payment(auth_result["authorization_id"])
        
        if capture_result["status"] != "completed":
            return {"success": False, "error": "Payment capture failed"}
        
        # 3.返回符合系统接口的结果
        return {"success": True, "transaction_id": capture_result["transaction_id"]}

# 测试代码
def test_payment_adapter():
    # 创建支付处理器
    credit_card = CreditCardProcessor("4111-1111-1111-1111", "12/25", "123")
    paypal = PayPalProcessor("customer@example.com")
    
    # 创建第三方API和适配器
    third_party_api = ThirdPartyPaymentAPI("api_key_12345abcde")
    third_party_adapter = ThirdPartyPaymentAdapter(third_party_api)
    
    # 测试信用卡支付
    print("\n=== Testing Credit Card Payment ===")
    cc_result = credit_card.process_payment(100.00)
    print(f"Result: {cc_result}")
    
    # 测试PayPal支付
    print("\n=== Testing PayPal Payment ===")
    paypal_result = paypal.process_payment(75.50)
    print(f"Result: {paypal_result}")
    
    # 测试第三方API适配器
    print("\n=== Testing Third-party Payment Adapter ===")
    adapter_result = third_party_adapter.process_payment(120.75)
    print(f"Result: {adapter_result}")
    
    # 验证所有支付处理器都实现了相同的接口
    processors = [credit_card, paypal, third_party_adapter]
    print("\n=== Verifying Common Interface ===")
    for i, processor in enumerate(processors, 1):
        print(f"Processor {i} is a PaymentProcessor: {isinstance(processor, PaymentProcessor)}")
        result = processor.process_payment(50.00)
        print(f"Processor {i} result has 'success' key: {'success' in result}")
        print(f"Processor {i} result has 'transaction_id' key: {'transaction_id' in result}")

if __name__ == "__main__":
    test_payment_adapter()

7.命令模式(Command Pattern)

题目描述:
设计一个简单的文本编辑器,支持多种操作(如插入文本、删除文本、复制文本等),并能够实现撤销和重做功能。

要求:
1.设计一个 Command 接口,定义执行和撤销方法
2.实现几种具体命令,如 InsertTextCommandDeleteTextCommandCopyTextCommand
3.实现一个 TextEditor 类作为接收者,提供基本的文本操作
4.实现一个 EditorInvoker 类,管理命令的执行、撤销和重做
5.编写测试代码,验证编辑器能够正确执行各种操作,并支持撤销和重做

代码框架:

from abc import ABC, abstractmethod

class Command(ABC):
    @abstractmethod
    def execute(self):
        """执行命令"""
        pass
    
    @abstractmethod
    def undo(self):
        """撤销命令"""
        pass

class TextEditor:
    # 实现文本编辑器,提供基本的文本操作
    pass

class InsertTextCommand(Command):
    # 实现插入文本命令
    pass

class DeleteTextCommand(Command):
    # 实现删除文本命令
    pass

class CopyTextCommand(Command):
    # 实现复制文本命令
    pass

class EditorInvoker:
    # 实现编辑器调用者,管理命令的执行、撤销和重做
    pass

# 测试代码
def test_text_editor_commands():
    # 测试编辑器的各种操作和撤销/重做功能
    pass

参考答案:

from abc import ABC, abstractmethod

class Command(ABC):
    @abstractmethod
    def execute(self):
        """执行命令"""
        pass
    
    @abstractmethod
    def undo(self):
        """撤销命令"""
        pass

class TextEditor:
    def __init__(self):
        self.text = ""
        self.clipboard = ""
        self.cursor_position = 0
    
    def insert_text(self, position, text):
        """在指定位置插入文本"""
        if position < 0 or position > len(self.text):
            raise ValueError("Invalid position")
        
        self.text = self.text[:position] + text + self.text[position:]
        self.cursor_position = position + len(text)
        return text
    
    def delete_text(self, start, end):
        """删除指定范围的文本"""
        if start < 0 or end > len(self.text) or start > end:
            raise ValueError("Invalid range")
        
        deleted_text = self.text[start:end]
        self.text = self.text[:start] + self.text[end:]
        self.cursor_position = start
        return deleted_text
    
    def copy_text(self, start, end):
        """复制指定范围的文本到剪贴板"""
        if start < 0 or end > len(self.text) or start > end:
            raise ValueError("Invalid range")
        
        self.clipboard = self.text[start:end]
        self.cursor_position = end
        return self.clipboard
    
    def paste_text(self, position):
        """在指定位置粘贴剪贴板内容"""
        if position < 0 or position > len(self.text):
            raise ValueError("Invalid position")
        
        return self.insert_text(position, self.clipboard)
    
    def get_text(self):
        """获取当前文本内容"""
        return self.text
    
    def get_cursor_position(self):
        """获取当前光标位置"""
        return self.cursor_position
    
    def get_clipboard(self):
        """获取剪贴板内容"""
        return self.clipboard

class InsertTextCommand(Command):
    def __init__(self, editor, position, text):
        self.editor = editor
        self.position = position
        self.text = text
        self.inserted_text = None
    
    def execute(self):
        self.inserted_text = self.editor.insert_text(self.position, self.text)
        return True
    
    def undo(self):
        if self.inserted_text:
            self.editor.delete_text(self.position, self.position + len(self.inserted_text))
            return True
        return False

class DeleteTextCommand(Command):
    def __init__(self, editor, start, end):
        self.editor = editor
        self.start = start
        self.end = end
        self.deleted_text = None
    
    def execute(self):
        self.deleted_text = self.editor.delete_text(self.start, self.end)
        return True
    
    def undo(self):
        if self.deleted_text:
            self.editor.insert_text(self.start, self.deleted_text)
            return True
        return False

class CopyTextCommand(Command):
    def __init__(self, editor, start, end):
        self.editor = editor
        self.start = start
        self.end = end
        self.previous_clipboard = None
    
    def execute(self):
        self.previous_clipboard = self.editor.get_clipboard()
        self.editor.copy_text(self.start, self.end)
        return True
    
    def undo(self):
        # 恢复之前的剪贴板内容
        if self.previous_clipboard is not None:
            self.editor.clipboard = self.previous_clipboard
            return True
        return False

class PasteTextCommand(Command):
    def __init__(self, editor, position):
        self.editor = editor
        self.position = position
        self.pasted_text = None
    
    def execute(self):
        self.pasted_text = self.editor.paste_text(self.position)
        return True
    
    def undo(self):
        if self.pasted_text:
            self.editor.delete_text(self.position, self.position + len(self.pasted_text))
            return True
        return False

class EditorInvoker:
    def __init__(self):
        self.command_history = []
        self.redo_stack = []
    
    def execute_command(self, command):
        """执行命令并添加到历史记录"""
        if command.execute():
            self.command_history.append(command)
            self.redo_stack.clear()  # 执行新命令后清空重做栈
            return True
        return False
    
    def undo(self):
        """撤销最后一个命令"""
        if self.command_history:
            command = self.command_history.pop()
            if command.undo():
                self.redo_stack.append(command)
                return True
        return False
    
    def redo(self):
        """重做最后一个撤销的命令"""
        if self.redo_stack:
            command = self.redo_stack.pop()
            if command.execute():
                self.command_history.append(command)
                return True
        return False
    
    def get_history_size(self):
        """获取历史记录大小"""
        return len(self.command_history)
    
    def get_redo_stack_size(self):
        """获取重做栈大小"""
        return len(self.redo_stack)

# 测试代码
def test_text_editor_commands():
    # 创建编辑器和调用者
    editor = TextEditor()
    invoker = EditorInvoker()
    
    # 测试插入命令
    print("=== Testing Insert Command ===")
    insert_cmd = InsertTextCommand(editor, 0, "Hello, ")
    invoker.execute_command(insert_cmd)
    print(f"Text after insert: '{editor.get_text()}'")
    
    insert_cmd2 = InsertTextCommand(editor, 7, "world!")
    invoker.execute_command(insert_cmd2)
    print(f"Text after second insert: '{editor.get_text()}'")
    
    # 测试复制命令
    print("\n=== Testing Copy Command ===")
    copy_cmd = CopyTextCommand(editor, 7, 12)
    invoker.execute_command(copy_cmd)
    print(f"Clipboard after copy: '{editor.get_clipboard()}'")
    
    # 测试粘贴命令
    print("\n=== Testing Paste Command ===")
    paste_cmd = PasteTextCommand(editor, 13)
    invoker.execute_command(paste_cmd)
    print(f"Text after paste: '{editor.get_text()}'")
    
    # 测试删除命令
    print("\n=== Testing Delete Command ===")
    delete_cmd = DeleteTextCommand(editor, 7, 12)
    invoker.execute_command(delete_cmd)
    print(f"Text after delete: '{editor.get_text()}'")
    
    # 测试撤销
    print("\n=== Testing Undo ===")
    invoker.undo()  # 撤销删除
    print(f"Text after undo delete: '{editor.get_text()}'")
    
    invoker.undo()  # 撤销粘贴
    print(f"Text after undo paste: '{editor.get_text()}'")
    
    # 测试重做
    print("\n=== Testing Redo ===")
    invoker.redo()  # 重做粘贴
    print(f"Text after redo paste: '{editor.get_text()}'")
    
    # 测试多次撤销和重做
    print("\n=== Testing Multiple Undo/Redo ===")
    invoker.undo()  # 撤销粘贴
    invoker.undo()  # 撤销复制
    invoker.undo()  # 撤销第二次插入
    print(f"Text after multiple undos: '{editor.get_text()}'")
    
    invoker.redo()  # 重做第二次插入
    print(f"Text after redo insert: '{editor.get_text()}'")
    
    # 测试在撤销后执行新命令
    print("\n=== Testing New Command After Undo ===")
    insert_cmd3 = InsertTextCommand(editor, 7, "Python!")
    invoker.execute_command(insert_cmd3)
    print(f"Text after new insert: '{editor.get_text()}'")
    print(f"Redo stack size(should be 0): {invoker.get_redo_stack_size()}")

if __name__ == "__main__":
    test_text_editor_commands()

8.组合模式(Composite Pattern)

题目描述:
设计一个文件系统结构,支持文件和目录的层次结构。系统需要能够计算目录大小(包括所有子文件和子目录的大小总和),并支持搜索文件。

要求:
1.设计一个 FileSystemComponent 抽象基类,定义获取名称、大小和搜索方法
2.实现 File 类作为叶节点,表示单个文件
3.实现 Directory 类作为组合节点,可以包含文件和子目录
4.目录的大小应该是其包含的所有文件和子目录大小的总和
5.实现搜索功能,能够在整个文件系统中查找指定名称的文件
6.编写测试代码,验证系统能够正确计算目录大小和搜索文件

代码框架:

from abc import ABC, abstractmethod

class FileSystemComponent(ABC):
    @abstractmethod
    def get_name(self):
        """获取名称"""
        pass
    
    @abstractmethod
    def get_size(self):
        """获取大小"""
        pass
    
    @abstractmethod
    def search(self, name):
        """搜索文件"""
        pass

class File(FileSystemComponent):
    # 实现文件类
    pass

class Directory(FileSystemComponent):
    # 实现目录类
    pass

# 测试代码
def test_file_system():
    # 测试文件系统的目录大小计算和文件搜索功能
    pass

参考答案:

from abc import ABC, abstractmethod

class FileSystemComponent(ABC):
    def __init__(self, name):
        self.name = name
    
    def get_name(self):
        """获取名称"""
        return self.name
    
    @abstractmethod
    def get_size(self):
        """获取大小"""
        pass
    
    @abstractmethod
    def search(self, name):
        """搜索文件"""
        pass
    
    @abstractmethod
    def print_structure(self, indent=""):
        """打印结构"""
        pass

class File(FileSystemComponent):
    def __init__(self, name, size):
        super().__init__(name)
        self.size = size
    
    def get_size(self):
        return self.size
    
    def search(self, name):
        """搜索文件,如果名称匹配则返回自身,否则返回空列表"""
        if name in self.name:
            return [self]
        return []
    
    def print_structure(self, indent=""):
        """打印文件结构"""
        print(f"{indent}- {self.name}({self.size} bytes)")

class Directory(FileSystemComponent):
    def __init__(self, name):
        super().__init__(name)
        self.children = []
    
    def add(self, component):
        """添加子组件(文件或目录)"""
        self.children.append(component)
    
    def remove(self, component):
        """移除子组件"""
        self.children.remove(component)
    
    def get_size(self):
        """计算目录大小(所有子组件大小的总和)"""
        total_size = 0
        for child in self.children:
            total_size += child.get_size()
        return total_size
    
    def search(self, name):
        """搜索文件,返回所有匹配的文件列表"""
        result = []
        # 如果目录名称匹配,也将其添加到结果中
        if name in self.name:
            result.append(self)
        
        # 在所有子组件中搜索
        for child in self.children:
            result.extend(child.search(name))
        
        return result
    
    def print_structure(self, indent=""):
        """打印目录结构"""
        print(f"{indent}+ {self.name}({self.get_size()} bytes)")
        for child in self.children:
            child.print_structure(indent + "  ")

# 测试代码
def test_file_system():
    # 创建根目录
    root = Directory("root")
    
    # 创建子目录
    documents = Directory("documents")
    pictures = Directory("pictures")
    
    # 添加子目录到根目录
    root.add(documents)
    root.add(pictures)
    
    # 创建文件并添加到目录
    file1 = File("report.docx", 2048)
    file2 = File("presentation.pptx", 4096)
    documents.add(file1)
    documents.add(file2)
    
    file3 = File("vacation.jpg", 3072)
    file4 = File("family.jpg", 2560)
    pictures.add(file3)
    pictures.add(file4)
    
    # 创建子目录并添加文件
    work = Directory("work")
    documents.add(work)
    file5 = File("project_plan.docx", 1536)
    work.add(file5)
    
    # 打印文件系统结构
    print("=== File System Structure ===")
    root.print_structure()
    
    # 测试大小计算
    print("\n=== Size Calculations ===")
    print(f"Size of root: {root.get_size()} bytes")
    print(f"Size of documents: {documents.get_size()} bytes")
    print(f"Size of pictures: {pictures.get_size()} bytes")
    print(f"Size of work: {work.get_size()} bytes")
    
    # 测试文件搜索
    print("\n=== File Search ===")
    search_term = "docx"
    results = root.search(search_term)
    print(f"Search results for '{search_term}': ")
    for item in results:
        print(f"- {item.get_name()}({item.get_size()} bytes)")
    
    search_term = "jpg"
    results = root.search(search_term)
    print(f"\nSearch results for '{search_term}': ")
    for item in results:
        print(f"- {item.get_name()}({item.get_size()} bytes)")
    
    search_term = "work"
    results = root.search(search_term)
    print(f"\nSearch results for '{search_term}': ")
    for item in results:
        print(f"- {item.get_name()}({item.get_size()} bytes)")
    
    # 测试移除组件
    print("\n=== Removing Components ===")
    documents.remove(file1)
    print("Removed report.docx from documents")
    root.print_structure()

if __name__ == "__main__":
    test_file_system()

9.状态模式(State Pattern)

题目描述:
设计一个自动售货机系统,根据不同的状态(如无货、有货、已投币、已选择商品等)展示不同的行为。

要求:
1.设计一个 VendingMachineState 接口,定义各种操作方法
2.实现几种具体状态类,如 NoItemStateHasItemStateHasCoinStateDispenseState
3.实现一个 VendingMachine 类,根据当前状态执行不同的行为
4.状态应该能够自动转换,如投币后从 HasItemState 转换到 HasCoinState
5.编写测试代码,验证售货机在不同状态下的行为是否正确

代码框架:

from abc import ABC, abstractmethod

class VendingMachineState(ABC):
    @abstractmethod
    def insert_coin(self, machine):
        """投入硬币"""
        pass
    
    @abstractmethod
    def select_item(self, machine, item_code):
        """选择商品"""
        pass
    
    @abstractmethod
    def dispense_item(self, machine):
        """发放商品"""
        pass
    
    @abstractmethod
    def return_coin(self, machine):
        """退回硬币"""
        pass

class NoItemState(VendingMachineState):
    # 实现无货状态
    pass

class HasItemState(VendingMachineState):
    # 实现有货状态
    pass

class HasCoinState(VendingMachineState):
    # 实现已投币状态
    pass

class DispenseState(VendingMachineState):
    # 实现发放商品状态
    pass

class VendingMachine:
    # 实现自动售货机,根据当前状态执行不同行为
    pass

# 测试代码
def test_vending_machine():
    # 测试售货机在不同状态下的行为
    pass

参考答案:

from abc import ABC, abstractmethod

class VendingMachineState(ABC):
    @abstractmethod
    def insert_coin(self, machine):
        """投入硬币"""
        pass
    
    @abstractmethod
    def select_item(self, machine, item_code):
        """选择商品"""
        pass
    
    @abstractmethod
    def dispense_item(self, machine):
        """发放商品"""
        pass
    
    @abstractmethod
    def return_coin(self, machine):
        """退回硬币"""
        pass

class NoItemState(VendingMachineState):
    def insert_coin(self, machine):
        print("无法接受硬币,售货机内没有商品")
        return False
    
    def select_item(self, machine, item_code):
        print("无法选择商品,售货机内没有商品")
        return False
    
    def dispense_item(self, machine):
        print("无法发放商品,售货机内没有商品")
        return False
    
    def return_coin(self, machine):
        print("没有硬币可退回")
        return False
    
    def __str__(self):
        return "无货状态"

class HasItemState(VendingMachineState):
    def insert_coin(self, machine):
        print("已接受硬币")
        machine.set_state(machine.has_coin_state)
        return True
    
    def select_item(self, machine, item_code):
        print("请先投入硬币")
        return False
    
    def dispense_item(self, machine):
        print("请先投入硬币并选择商品")
        return False
    
    def return_coin(self, machine):
        print("没有硬币可退回")
        return False
    
    def __str__(self):
        return "有货状态"

class HasCoinState(VendingMachineState):
    def insert_coin(self, machine):
        print("已经投入了硬币,请选择商品或退回硬币")
        return False
    
    def select_item(self, machine, item_code):
        if item_code in machine.items and machine.items[item_code]["count"] > 0:
            machine.selected_item = item_code
            print(f"已选择商品: {machine.items[item_code]['name']}")
            machine.set_state(machine.dispense_state)
            return True
        else:
            print(f"商品编号 {item_code} 无效或已售罄")
            return False
    
    def dispense_item(self, machine):
        print("请先选择商品")
        return False
    
    def return_coin(self, machine):
        print("退回硬币")
        machine.set_state(machine.has_item_state)
        return True
    
    def __str__(self):
        return "已投币状态"

class DispenseState(VendingMachineState):
    def insert_coin(self, machine):
        print("正在发放商品,请稍等")
        return False
    
    def select_item(self, machine, item_code):
        print("正在发放商品,请稍等")
        return False
    
    def dispense_item(self, machine):
        if machine.selected_item and machine.selected_item in machine.items:
            item = machine.items[machine.selected_item]
            if item["count"] > 0:
                item["count"] -= 1
                print(f"发放商品: {item['name']}")
                
                # 检查是否还有商品
                if machine.get_total_items() > 0:
                    machine.set_state(machine.has_item_state)
                else:
                    machine.set_state(machine.no_item_state)
                
                machine.selected_item = None
                return True
        
        print("发放商品失败")
        machine.set_state(machine.has_item_state)
        return False
    
    def return_coin(self, machine):
        print("正在发放商品,无法退回硬币")
        return False
    
    def __str__(self):
        return "发放商品状态"

class VendingMachine:
    def __init__(self):
        # 初始化状态
        self.no_item_state = NoItemState()
        self.has_item_state = HasItemState()
        self.has_coin_state = HasCoinState()
        self.dispense_state = DispenseState()
        
        # 默认为无货状态
        self.state = self.no_item_state
        
        # 商品信息
        self.items = {}
        self.selected_item = None
    
    def set_state(self, state):
        """设置当前状态"""
        self.state = state
        print(f"售货机状态变更为: {state}")
    
    def add_item(self, code, name, price, count):
        """添加商品"""
        self.items[code] = {
            "name": name,
            "price": price,
            "count": count
        }
        print(f"添加商品: {name}, 价格: {price}, 数量: {count}")
        
        # 如果从无货变为有货,更新状态
        if self.state == self.no_item_state and self.get_total_items() > 0:
            self.set_state(self.has_item_state)
    
    def get_total_items(self):
        """获取商品总数"""
        return sum(item["count"] for item in self.items.values())
    
    def insert_coin(self):
        """投入硬币"""
        return self.state.insert_coin(self)
    
    def select_item(self, item_code):
        """选择商品"""
        return self.state.select_item(self, item_code)
    
    def dispense_item(self):
        """发放商品"""
        return self.state.dispense_item(self)
    
    def return_coin(self):
        """退回硬币"""
        return self.state.return_coin(self)
    
    def get_current_state(self):
        """获取当前状态"""
        return str(self.state)

# 测试代码
def test_vending_machine():
    # 创建售货机
    machine = VendingMachine()
    
    # 测试初始状态(无货)
    print("\n=== 测试初始状态(无货)===")
    print(f"当前状态: {machine.get_current_state()}")
    machine.insert_coin()  # 应该失败
    machine.select_item("A1")  # 应该失败
    machine.dispense_item()  # 应该失败
    machine.return_coin()  # 应该失败
    
    # 添加商品
    print("\n=== 添加商品 ===")
    machine.add_item("A1", "可乐", 2.5, 5)
    machine.add_item("A2", "薯片", 3.0, 3)
    machine.add_item("B1", "巧克力", 4.0, 2)
    print(f"当前状态: {machine.get_current_state()}")
    
    # 测试有货状态
    print("\n=== 测试有货状态 ===")
    machine.select_item("A1")  # 应该失败,需要先投币
    machine.dispense_item()  # 应该失败,需要先投币和选择商品
    
    # 投入硬币
    print("\n=== 投入硬币 ===")
    machine.insert_coin()  # 应该成功
    print(f"当前状态: {machine.get_current_state()}")
    
    # 测试已投币状态
    print("\n=== 测试已投币状态 ===")
    machine.insert_coin()  # 应该失败,已经投入了硬币
    
    # 选择商品
    print("\n=== 选择商品 ===")
    machine.select_item("A1")  # 应该成功
    print(f"当前状态: {machine.get_current_state()}")
    
    # 发放商品
    print("\n=== 发放商品 ===")
    machine.dispense_item()  # 应该成功
    print(f"当前状态: {machine.get_current_state()}")
    
    # 再次购买
    print("\n=== 再次购买 ===")
    machine.insert_coin()  # 应该成功
    machine.select_item("A2")  # 应该成功
    machine.dispense_item()  # 应该成功
    
    # 测试退币
    print("\n=== 测试退币 ===")
    machine.insert_coin()  # 应该成功
    print(f"当前状态: {machine.get_current_state()}")
    machine.return_coin()  # 应该成功
    print(f"当前状态: {machine.get_current_state()}")
    
    # 测试商品售罄
    print("\n=== 测试商品售罄 ===")
    # 购买所有剩余商品
    for _ in range(4):  # 可乐还剩3个,薯片还剩2个,巧克力还剩2个,总共7个
        machine.insert_coin()
        if machine.get_current_state() == "已投币状态":
            # 尝试购买可乐
            if machine.select_item("A1"):
                machine.dispense_item()
            # 如果可乐售罄,尝试购买薯片
            elif machine.select_item("A2"):
                machine.dispense_item()
            # 如果薯片售罄,尝试购买巧克力
            elif machine.select_item("B1"):
                machine.dispense_item()
    
    # 再购买3个商品,应该会售罄
    for _ in range(3):
        machine.insert_coin()
        if machine.get_current_state() == "已投币状态":
            if machine.select_item("A1"):
                machine.dispense_item()
            elif machine.select_item("A2"):
                machine.dispense_item()
            elif machine.select_item("B1"):
                machine.dispense_item()
    
    # 检查最终状态
    print("\n=== 最终状态 ===")
    print(f"当前状态: {machine.get_current_state()}")
    machine.insert_coin()  # 应该失败,无货

if __name__ == "__main__":
    test_vending_machine()

10. 模板方法模式 (Template Method Pattern)

题目描述:
设计一个数据分析系统,支持多种数据处理流程。所有流程都遵循相同的步骤:加载数据、清洗数据、分析数据和生成报告,但每个步骤的具体实现可能不同。

要求:

  1. 设计一个 DataAnalyzer 抽象基类,定义数据分析的模板方法和各个步骤的抽象方法
  2. 实现几种具体的数据分析器,如 SalesDataAnalyzerWebTrafficAnalyzerSocialMediaAnalyzer
  3. 模板方法应该定义整个分析流程,而具体子类只需实现各个步骤
  4. 提供钩子方法,允许子类选择性地覆盖某些步骤
  5. 编写测试代码,验证不同的数据分析器能够正确执行各自的分析流程

代码框架:

from abc import ABC, abstractmethod

class DataAnalyzer(ABC):
    def analyze(self):
        """模板方法,定义数据分析的整体流程"""
        self.load_data()
        self.clean_data()
        self.analyze_data()
        if self.should_generate_report():
            self.generate_report()
    
    @abstractmethod
    def load_data(self):
        """加载数据"""
        pass
    
    @abstractmethod
    def clean_data(self):
        """清洗数据"""
        pass
    
    @abstractmethod
    def analyze_data(self):
        """分析数据"""
        pass
    
    @abstractmethod
    def generate_report(self):
        """生成报告"""
        pass
    
    def should_generate_report(self):
        """钩子方法,决定是否生成报告"""
        return True

class SalesDataAnalyzer(DataAnalyzer):
    # 实现销售数据分析器
    pass

class WebTrafficAnalyzer(DataAnalyzer):
    # 实现网站流量分析器
    pass

class SocialMediaAnalyzer(DataAnalyzer):
    # 实现社交媒体分析器
    pass

# 测试代码
def test_data_analyzers():
    # 测试不同的数据分析器
    pass

参考答案:

from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import os

class DataAnalyzer(ABC):
    def __init__(self, data_source):
        self.data_source = data_source
        self.data = None
        self.results = None
        self.report_path = None
    
    def analyze(self):
        """模板方法,定义数据分析的整体流程"""
        print(f"\n开始分析 {self.__class__.__name__}")
        print(f"数据源: {self.data_source}")
        
        start_time = datetime.now()
        
        self.load_data()
        self.clean_data()
        self.analyze_data()
        if self.should_generate_report():
            self.generate_report()
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        print(f"分析完成,耗时: {duration:.2f} 秒")
        if self.report_path:
            print(f"报告保存在: {self.report_path}")
    
    @abstractmethod
    def load_data(self):
        """加载数据"""
        pass
    
    @abstractmethod
    def clean_data(self):
        """清洗数据"""
        pass
    
    @abstractmethod
    def analyze_data(self):
        """分析数据"""
        pass
    
    @abstractmethod
    def generate_report(self):
        """生成报告"""
        pass
    
    def should_generate_report(self):
        """钩子方法,决定是否生成报告"""
        return True

class SalesDataAnalyzer(DataAnalyzer):
    def load_data(self):
        print("加载销售数据...")
        # 模拟从CSV文件加载销售数据
        # 在实际应用中,这里会从真实数据源加载数据
        self.data = pd.DataFrame({
            'date': pd.date_range(start='2023-01-01', periods=100),
            'product': np.random.choice(['A', 'B', 'C', 'D'], size=100),
            'region': np.random.choice(['North', 'South', 'East', 'West'], size=100),
            'sales': np.random.randint(100, 1000, size=100),
            'units': np.random.randint(1, 50, size=100)
        })
        print(f"加载了 {len(self.data)} 条销售记录")
    
    def clean_data(self):
        print("清洗销售数据...")
        # 删除缺失值
        self.data = self.data.dropna()
        # 删除异常值(销售额为0但单位数不为0的记录)
        self.data = self.data[~((self.data['sales'] == 0) & (self.data['units'] > 0))]
        # 添加每单位销售额列
        self.data['price_per_unit'] = self.data['sales'] / self.data['units']
        print(f"清洗后剩余 {len(self.data)} 条记录")
    
    def analyze_data(self):
        print("分析销售数据...")
        # 按产品和地区分组计算销售统计
        product_sales = self.data.groupby('product')['sales'].agg(['sum', 'mean', 'count'])
        region_sales = self.data.groupby('region')['sales'].agg(['sum', 'mean', 'count'])
        
        # 计算每月销售趋势
        self.data['month'] = self.data['date'].dt.to_period('M')
        monthly_sales = self.data.groupby('month')['sales'].sum()
        
        self.results = {
            'product_sales': product_sales,
            'region_sales': region_sales,
            'monthly_sales': monthly_sales
        }
        
        print("销售数据分析完成")
        print("\n产品销售统计:")
        print(product_sales)
        print("\n地区销售统计:")
        print(region_sales)
    
    def generate_report(self):
        print("生成销售报告...")
        # 创建报告目录
        os.makedirs('reports', exist_ok=True)
        
        # 生成报告文件名
        report_file = f"reports/sales_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        self.report_path = report_file
        
        # 写入报告
        with open(report_file, 'w') as f:
            f.write("销售数据分析报告\n")
            f.write("=" * 50 + "\n\n")
            
            f.write("产品销售统计:\n")
            f.write(str(self.results['product_sales']) + "\n\n")
            
            f.write("地区销售统计:\n")
            f.write(str(self.results['region_sales']) + "\n\n")
            
            f.write("月度销售趋势:\n")
            f.write(str(self.results['monthly_sales']) + "\n")
        
        # 生成图表
        plt.figure(figsize=(12, 8))
        
        # 产品销售额饼图
        plt.subplot(2, 2, 1)
        product_sales = self.results['product_sales']['sum']
        plt.pie(product_sales, labels=product_sales.index, autopct='%1.1f%%')
        plt.title('产品销售额占比')
        
        # 地区销售额柱状图
        plt.subplot(2, 2, 2)
        region_sales = self.results['region_sales']['sum']
        plt.bar(region_sales.index, region_sales.values)
        plt.title('地区销售额')
        
        # 月度销售趋势线图
        plt.subplot(2, 1, 2)
        monthly_sales = self.results['monthly_sales']
        plt.plot(range(len(monthly_sales)), monthly_sales.values)
        plt.xticks(range(len(monthly_sales)), [str(x) for x in monthly_sales.index], rotation=45)
        plt.title('月度销售趋势')
        
        plt.tight_layout()
        chart_file = f"reports/sales_charts_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
        plt.savefig(chart_file)
        plt.close()
        
        print(f"销售报告已保存: {report_file}")
        print(f"销售图表已保存: {chart_file}")

class WebTrafficAnalyzer(DataAnalyzer):
    def load_data(self):
        print("加载网站流量数据...")
        # 模拟从日志文件加载网站流量数据
        self.data = pd.DataFrame({
            'timestamp': pd.date_range(start='2023-01-01', periods=1000, freq='10min'),
            'ip': [f'192.168.1.{np.random.randint(1, 255)}' for _ in range(1000)],
            'url': np.random.choice(['/home', '/products', '/about', '/contact', '/login'], size=1000),
            'status_code': np.random.choice([200, 301, 404, 500], size=1000, p=[0.8, 0.1, 0.07, 0.03]),
            'user_agent': np.random.choice(['Chrome', 'Firefox', 'Safari', 'Edge'], size=1000),
            'response_time': np.random.uniform(0.1, 2.0, size=1000)
        })
        print(f"加载了 {len(self.data)} 条访问记录")
    
    def clean_data(self):
        print("清洗网站流量数据...")
        # 删除响应时间异常的记录
        self.data = self.data[self.data['response_time'] < 1.5]
        # 添加日期和小时列
        self.data['date'] = self.data['timestamp'].dt.date
        self.data['hour'] = self.data['timestamp'].dt.hour
        print(f"清洗后剩余 {len(self.data)} 条记录")
    
    def analyze_data(self):
        print("分析网站流量数据...")
        # 计算每日访问量
        daily_visits = self.data.groupby('date').size()
        
        # 计算每小时访问量
        hourly_visits = self.data.groupby('hour').size()
        
        # 计算页面访问统计
        page_visits = self.data.groupby('url').size().sort_values(ascending=False)
        
        # 计算状态码统计
        status_stats = self.data.groupby('status_code').size()
        
        # 计算平均响应时间
        avg_response_time = self.data.groupby('url')['response_time'].mean()
        
        self.results = {
            'daily_visits': daily_visits,
            'hourly_visits': hourly_visits,
            'page_visits': page_visits,
            'status_stats': status_stats,
            'avg_response_time': avg_response_time
        }
        
        print("网站流量分析完成")
        print("\n页面访问统计:")
        print(page_visits)
        print("\n状态码统计:")
        print(status_stats)
    
    def generate_report(self):
        print("生成网站流量报告...")
        # 创建报告目录
        os.makedirs('reports', exist_ok=True)
        
        # 生成报告文件名
        report_file = f"reports/web_traffic_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        self.report_path = report_file
        
        # 写入报告
        with open(report_file, 'w') as f:
            f.write("网站流量分析报告\n")
            f.write("=" * 50 + "\n\n")
            
            f.write("每日访问量:\n")
            f.write(str(self.results['daily_visits'].tail(5)) + "\n\n")
            
            f.write("每小时访问量:\n")
            f.write(str(self.results['hourly_visits']) + "\n\n")
            
            f.write("页面访问统计:\n")
            f.write(str(self.results['page_visits']) + "\n\n")
            
            f.write("状态码统计:\n")
            f.write(str(self.results['status_stats']) + "\n\n")
            
            f.write("平均响应时间:\n")
            f.write(str(self.results['avg_response_time']) + "\n")
        
        print(f"网站流量报告已保存: {report_file}")

class SocialMediaAnalyzer(DataAnalyzer):
    def load_data(self):
        print("加载社交媒体数据...")
        # 模拟从API加载社交媒体数据
        self.data = pd.DataFrame({
            'post_id': range(1, 201),
            'timestamp': pd.date_range(start='2023-01-01', periods=200),
            'content': [f'Post content {i}' for i in range(1, 201)],
            'likes': np.random.randint(0, 1000, size=200),
            'shares': np.random.randint(0, 200, size=200),
            'comments': np.random.randint(0, 100, size=200),
            'user_id': np.random.randint(1, 50, size=200)
        })
        print(f"加载了 {len(self.data)} 条社交媒体记录")
    
    def clean_data(self):
        print("清洗社交媒体数据...")
        # 计算互动总数
        self.data['engagement'] = self.data['likes'] + self.data['shares'] * 2 + self.data['comments'] * 3
        # 添加日期列
        self.data['date'] = self.data['timestamp'].dt.date
        # 添加星期几列
        self.data['day_of_week'] = self.data['timestamp'].dt.day_name()
        print("社交媒体数据清洗完成")
    
    def analyze_data(self):
        print("分析社交媒体数据...")
        # 计算每日互动统计
        daily_engagement = self.data.groupby('date')['engagement'].sum()
        
        # 计算星期几互动统计
        day_of_week_engagement = self.data.groupby('day_of_week')['engagement'].mean().reindex([
            'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'
        ])
        
        # 计算互动最高的帖子
        top_posts = self.data.sort_values('engagement', ascending=False).head(10)
        
        # 计算用户互动统计
        user_engagement = self.data.groupby('user_id')['engagement'].sum().sort_values(ascending=False)
        
        self.results = {
            'daily_engagement': daily_engagement,
            'day_of_week_engagement': day_of_week_engagement,
            'top_posts': top_posts,
            'user_engagement': user_engagement
        }
        
        print("社交媒体数据分析完成")
        print("\n星期几互动统计:")
        print(day_of_week_engagement)
        print("\n互动最高的帖子:")
        print(top_posts[['post_id', 'likes', 'shares', 'comments', 'engagement']].head())
    
    def should_generate_report(self):
        # 只有当有高互动帖子时才生成报告
        return any(self.data['engagement'] > 1000)
    
    def generate_report(self):
        print("生成社交媒体报告...")
        # 创建报告目录
        os.makedirs('reports', exist_ok=True)
        
        # 生成报告文件名
        report_file = f"reports/social_media_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        self.report_path = report_file
        
        # 写入报告
        with open(report_file, 'w') as f:
            f.write("社交媒体分析报告\n")
            f.write("=" * 50 + "\n\n")
            
            f.write("每日互动统计:\n")
            f.write(str(self.results['daily_engagement'].tail(5)) + "\n\n")
            
            f.write("星期几互动统计:\n")
            f.write(str(self.results['day_of_week_engagement']) + "\n\n")
            
			f.write("互动最高的帖子:\n")
			top_posts = self.results['top_posts'][['post_id', 'likes', 'shares', 'comments', 'engagement']]
			f.write(str(top_posts.head()) + "\n\n")
			
			f.write("用户互动统计 (前5名):\n")
			f.write(str(self.results['user_engagement'].head(5)) + "\n")
			
			print(f"社交媒体报告已保存到 {report_file}")

# 测试代码
def test_data_analyzers():
	# 创建并测试销售数据分析器
	sales_analyzer = SalesDataAnalyzer("sales_data.csv")
	sales_analyzer.analyze()
	
	# 创建并测试网站流量分析器
	web_analyzer = WebTrafficAnalyzer("web_logs.json")
	web_analyzer.analyze()
	
	# 创建并测试社交媒体分析器
	social_analyzer = SocialMediaAnalyzer("social_api")
	social_analyzer.analyze()
	
	# 测试钩子方法
	print("\n=== 测试钩子方法 ===")
	# 创建一个不生成报告的分析器
	class NoReportAnalyzer(SalesDataAnalyzer):
		def should_generate_report(self):
			return False
	
	no_report_analyzer = NoReportAnalyzer("sales_data.csv")
	no_report_analyzer.analyze()

if __name__ == "__main__":
	test_data_analyzers()

11. 综合应用题:电子商务系统

题目描述:
设计一个简化版的电子商务系统,综合应用多种设计模式。系统需要支持商品浏览、购物车管理、订单处理和支付等功能。

要求:

  1. 使用工厂模式创建不同类型的商品
  2. 使用单例模式管理购物车
  3. 使用观察者模式实现商品库存变化通知
  4. 使用策略模式处理不同的支付方式
  5. 使用命令模式实现订单操作(创建、取消、修改)
  6. 使用装饰器模式实现商品的附加服务(如礼品包装、快递保险等)
  7. 系统应该具有良好的可扩展性和可维护性
  8. 编写测试代码,验证系统的各项功能

代码框架:

# 这里只提供基本框架,需要综合应用多种设计模式实现完整功能

# 商品相关类
class Product:
    # 商品基类
    pass

class ProductFactory:
    # 商品工厂
    pass

# 购物车相关类
class ShoppingCart:
    # 购物车(单例)
    pass

# 库存相关类
class InventorySubject:
    # 库存主题
    pass

class InventoryObserver:
    # 库存观察者接口
    pass

# 支付相关类
class PaymentStrategy:
    # 支付策略接口
    pass

# 订单相关类
class OrderCommand:
    # 订单命令接口
    pass

# 商品服务装饰器
class ProductServiceDecorator:
    # 商品服务装饰器
    pass

# 电子商务系统
class ECommerceSystem:
    # 整合各组件的系统类
    pass

# 测试代码
def test_ecommerce_system():
    # 测试电子商务系统的各项功能
    pass

参考答案:

from abc import ABC, abstractmethod
import uuid
from datetime import datetime

# =============== 1. 工厂模式 - 商品创建 ===============

class Product:
    def __init__(self, id, name, price, category, description):
        self.id = id
        self.name = name
        self.price = price
        self.category = category
        self.description = description
    
    def __str__(self):
        return f"{self.name} (${self.price:.2f})"

class ElectronicProduct(Product):
    def __init__(self, id, name, price, description, warranty_period, brand):
        super().__init__(id, name, price, "Electronics", description)
        self.warranty_period = warranty_period
        self.brand = brand

class ClothingProduct(Product):
    def __init__(self, id, name, price, description, size, color):
        super().__init__(id, name, price, "Clothing", description)
        self.size = size
        self.color = color

class BookProduct(Product):
    def __init__(self, id, name, price, description, author, publisher):
        super().__init__(id, name, price, "Books", description)
        self.author = author
        self.publisher = publisher

class ProductFactory:
    @staticmethod
    def create_product(product_type, **kwargs):
        if product_type == "electronics":
            return ElectronicProduct(
                id=kwargs.get('id', str(uuid.uuid4())),
                name=kwargs.get('name', ''),
                price=kwargs.get('price', 0.0),
                description=kwargs.get('description', ''),
                warranty_period=kwargs.get('warranty_period', '1 year'),
                brand=kwargs.get('brand', '')
            )
        elif product_type == "clothing":
            return ClothingProduct(
                id=kwargs.get('id', str(uuid.uuid4())),
                name=kwargs.get('name', ''),
                price=kwargs.get('price', 0.0),
                description=kwargs.get('description', ''),
                size=kwargs.get('size', 'M'),
                color=kwargs.get('color', 'Black')
            )
        elif product_type == "book":
            return BookProduct(
                id=kwargs.get('id', str(uuid.uuid4())),
                name=kwargs.get('name', ''),
                price=kwargs.get('price', 0.0),
                description=kwargs.get('description', ''),
                author=kwargs.get('author', ''),
                publisher=kwargs.get('publisher', '')
            )
        else:
            raise ValueError(f"Unknown product type: {product_type}")

# =============== 2. 单例模式 - 购物车 ===============

class ShoppingCart:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(ShoppingCart, cls).__new__(cls)
            cls._instance.items = {}  # 商品ID -> (商品, 数量)
        return cls._instance
    
    def add_item(self, product, quantity=1):
        if product.id in self.items:
            self.items[product.id] = (product, self.items[product.id][1] + quantity)
        else:
            self.items[product.id] = (product, quantity)
    
    def remove_item(self, product_id):
        if product_id in self.items:
            del self.items[product_id]
    
    def update_quantity(self, product_id, quantity):
        if product_id in self.items:
            if quantity <= 0:
                self.remove_item(product_id)
            else:
                product = self.items[product_id][0]
                self.items[product_id] = (product, quantity)
    
    def get_total(self):
        return sum(product.price * quantity for product, quantity in self.items.values())
    
    def clear(self):
        self.items.clear()
    
    def get_items(self):
        return [(product, quantity) for product, quantity in self.items.values()]
    
    def __str__(self):
        if not self.items:
            return "购物车为空"
        
        cart_str = "购物车内容:\n"
        for product, quantity in self.items.values():
            cart_str += f"  - {product.name} x {quantity}: ${product.price * quantity:.2f}\n"
        cart_str += f"总计: ${self.get_total():.2f}"
        return cart_str

# =============== 3. 观察者模式 - 库存管理 ===============

class InventoryObserver(ABC):
    @abstractmethod
    def update(self, product_id, new_quantity):
        pass

class InventorySubject:
    def __init__(self):
        self.observers = []
        self.inventory = {}  # 商品ID -> 数量
    
    def register_observer(self, observer):
        if observer not in self.observers:
            self.observers.append(observer)
    
    def remove_observer(self, observer):
        if observer in self.observers:
            self.observers.remove(observer)
    
    def notify_observers(self, product_id, new_quantity):
        for observer in self.observers:
            observer.update(product_id, new_quantity)
    
    def add_product(self, product, quantity):
        self.inventory[product.id] = quantity
    
    def update_quantity(self, product_id, quantity):
        if product_id in self.inventory:
            old_quantity = self.inventory[product_id]
            self.inventory[product_id] = quantity
            if old_quantity != quantity:
                self.notify_observers(product_id, quantity)
    
    def get_quantity(self, product_id):
        return self.inventory.get(product_id, 0)
    
    def is_available(self, product_id, quantity=1):
        return self.get_quantity(product_id) >= quantity

class LowStockNotifier(InventoryObserver):
    def __init__(self, threshold=5):
        self.threshold = threshold
    
    def update(self, product_id, new_quantity):
        if new_quantity <= self.threshold:
            print(f"低库存警告: 商品 {product_id} 库存仅剩 {new_quantity} 件")

class OutOfStockNotifier(InventoryObserver):
    def update(self, product_id, new_quantity):
        if new_quantity == 0:
            print(f"缺货警告: 商品 {product_id} 已售罄")

# =============== 4. 策略模式 - 支付处理 ===============

class PaymentStrategy(ABC):
    @abstractmethod
    def pay(self, amount):
        pass

class CreditCardPayment(PaymentStrategy):
    def __init__(self, card_number, expiry_date, cvv):
        self.card_number = card_number
        self.expiry_date = expiry_date
        self.cvv = cvv
    
    def pay(self, amount):
        # 实际应用中,这里会调用信用卡支付API
        print(f"使用信用卡支付 ${amount:.2f}")
        print(f"卡号: {self.card_number[-4:]} (到期日: {self.expiry_date})")
        return {"success": True, "transaction_id": str(uuid.uuid4())}

class PayPalPayment(PaymentStrategy):
    def __init__(self, email):
        self.email = email
    
    def pay(self, amount):
        # 实际应用中,这里会调用PayPal API
        print(f"使用PayPal支付 ${amount:.2f}")
        print(f"PayPal账户: {self.email}")
        return {"success": True, "transaction_id": str(uuid.uuid4())}

class BankTransferPayment(PaymentStrategy):
    def __init__(self, account_number, bank_code):
        self.account_number = account_number
        self.bank_code = bank_code
    
    def pay(self, amount):
        # 实际应用中,这里会调用银行转账API
        print(f"使用银行转账支付 ${amount:.2f}")
        print(f"账号: {self.account_number}, 银行代码: {self.bank_code}")
        return {"success": True, "transaction_id": str(uuid.uuid4())}

# =============== 5. 命令模式 - 订单处理 ===============

class Order:
    def __init__(self, order_id, items, customer_info, payment_info, status="pending"):
        self.order_id = order_id
        self.items = items  # [(product, quantity), ...]
        self.customer_info = customer_info
        self.payment_info = payment_info
        self.status = status
        self.created_at = datetime.now()
        self.updated_at = self.created_at
    
    def update_status(self, status):
        self.status = status
        self.updated_at = datetime.now()
    
    def get_total(self):
        return sum(product.price * quantity for product, quantity in self.items)
    
    def __str__(self):
        order_str = f"订单 #{self.order_id} ({self.status})\n"
        order_str += f"创建时间: {self.created_at}\n"
        order_str += "商品:\n"
        for product, quantity in self.items:
            order_str += f"  - {product.name} x {quantity}: ${product.price * quantity:.2f}\n"
        order_str += f"总计: ${self.get_total():.2f}"
        return order_str

class OrderCommand(ABC):
    @abstractmethod
    def execute(self):
        pass
    
    @abstractmethod
    def undo(self):
        pass

class CreateOrderCommand(OrderCommand):
    def __init__(self, order_manager, items, customer_info, payment_strategy):
        self.order_manager = order_manager
        self.items = items
        self.customer_info = customer_info
        self.payment_strategy = payment_strategy
        self.order = None
    
    def execute(self):
        # 创建订单
        order_id = str(uuid.uuid4())
        self.order = Order(order_id, self.items, self.customer_info, self.payment_strategy)
        
        # 处理支付
        amount = self.order.get_total()
        payment_result = self.payment_strategy.pay(amount)
        
        if payment_result["success"]:
            self.order.update_status("paid")
            self.order_manager.add_order(self.order)
            
            # 更新库存
            for product, quantity in self.items:
                current_quantity = self.order_manager.inventory.get_quantity(product.id)
                self.order_manager.inventory.update_quantity(product.id, current_quantity - quantity)
            
            return self.order
        else:
            self.order.update_status("payment_failed")
            return None
    
    def undo(self):
        if self.order and self.order.status == "paid":
            # 恢复库存
            for product, quantity in self.items:
                current_quantity = self.order_manager.inventory.get_quantity(product.id)
                self.order_manager.inventory.update_quantity(product.id, current_quantity + quantity)
            
            # 移除订单
            self.order_manager.remove_order(self.order.order_id)
            return True
        return False

class CancelOrderCommand(OrderCommand):
    def __init__(self, order_manager, order_id):
        self.order_manager = order_manager
        self.order_id = order_id
        self.order = None
    
    def execute(self):
        self.order = self.order_manager.get_order(self.order_id)
        if self.order and self.order.status in ["pending", "paid"]:
            # 恢复库存
            for product, quantity in self.order.items:
                current_quantity = self.order_manager.inventory.get_quantity(product.id)
                self.order_manager.inventory.update_quantity(product.id, current_quantity + quantity)
            
            self.order.update_status("cancelled")
            return True
        return False
    
    def undo(self):
        if self.order and self.order.status == "cancelled":
            # 重新扣减库存
            for product, quantity in self.order.items:
                current_quantity = self.order_manager.inventory.get_quantity(product.id)
                self.order_manager.inventory.update_quantity(product.id, current_quantity - quantity)
            
            self.order.update_status("paid")
            return True
        return False

class OrderManager:
    def __init__(self, inventory):
        self.orders = {}  # 订单ID -> 订单
        self.inventory = inventory
    
    def add_order(self, order):
        self.orders[order.order_id] = order
    
    def remove_order(self, order_id):
        if order_id in self.orders:
            del self.orders[order_id]
    
    def get_order(self, order_id):
        return self.orders.get(order_id)
    
    def get_all_orders(self):
        return list(self.orders.values())

# =============== 6. 装饰器模式 - 商品服务 ===============

class ProductService(ABC):
    @abstractmethod
    def get_description(self):
        pass
    
    @abstractmethod
    def get_cost(self):
        pass

class ConcreteProduct(ProductService):
    def __init__(self, product, quantity=1):
        self.product = product
        self.quantity = quantity
    
    def get_description(self):
        return f"{self.product.name} x {self.quantity}"
    
    def get_cost(self):
        return self.product.price * self.quantity

class ProductServiceDecorator(ProductService):
    def __init__(self, product_service):
        self.product_service = product_service
    
    def get_description(self):
        return self.product_service.get_description()
    
    def get_cost(self):
        return self.product_service.get_cost()

class GiftWrappingDecorator(ProductServiceDecorator):
    def __init__(self, product_service, wrapping_style="标准"):
        super().__init__(product_service)
        self.wrapping_style = wrapping_style
    
    def get_description(self):
        return f"{self.product_service.get_description()} [礼品包装: {self.wrapping_style}]"
    
    def get_cost(self):
        # 礼品包装额外收费
        wrapping_cost = 5.0 if self.wrapping_style == "豪华" else 2.0
        return self.product_service.get_cost() + wrapping_cost

class ExpressShippingDecorator(ProductServiceDecorator):
    def get_description(self):
        return f"{self.product_service.get_description()} [快递配送]"
    
    def get_cost(self):
        # 快递配送额外收费
        return self.product_service.get_cost() + 10.0

class InsuranceDecorator(ProductServiceDecorator):
    def get_description(self):
        return f"{self.product_service.get_description()} [运输保险]"
    
    def get_cost(self):
        # 保险费用为商品价值的5%
        return self.product_service.get_cost() + (self.product_service.get_cost() * 0.05)

# =============== 电子商务系统 ===============

class ECommerceSystem:
    def __init__(self):
        self.product_factory = ProductFactory()
        self.shopping_cart = ShoppingCart()
        self.inventory = InventorySubject()
        self.order_manager = OrderManager(self.inventory)
        
        # 注册库存观察者
        self.inventory.register_observer(LowStockNotifier(threshold=5))
        self.inventory.register_observer(OutOfStockNotifier())
    
    def create_product(self, product_type, **kwargs):
        return self.product_factory.create_product(product_type, **kwargs)
    
    def add_product_to_inventory(self, product, quantity):
        self.inventory.add_product(product, quantity)
    
    def add_to_cart(self, product, quantity=1):
        if self.inventory.is_available(product.id, quantity):
            self.shopping_cart.add_item(product, quantity)
            return True
        return False
    
    def remove_from_cart(self, product_id):
        self.shopping_cart.remove_item(product_id)
    
    def update_cart_quantity(self, product_id, quantity):
        self.shopping_cart.update_quantity(product_id, quantity)
    
    def checkout(self, customer_info, payment_strategy):
        items = self.shopping_cart.get_items()
        if not items:
            print("购物车为空,无法结账")
            return None
        
        # 检查库存
        for product, quantity in items:
            if not self.inventory.is_available(product.id, quantity):
                print(f"商品 {product.name} 库存不足,无法结账")
                return None
        
        # 创建订单
        command = CreateOrderCommand(self.order_manager, items, customer_info, payment_strategy)
        order = command.execute()
        
        if order:
            print(f"订单创建成功: {order.order_id}")
            self.shopping_cart.clear()
        else:
            print("订单创建失败")
        
        return order
    
    def cancel_order(self, order_id):
        command = CancelOrderCommand(self.order_manager, order_id)
        if command.execute():
            print(f"订单 {order_id} 已取消")
            return True
        else:
            print(f"无法取消订单 {order_id}")
            return False
    
    def get_order(self, order_id):
        return self.order_manager.get_order(order_id)
    
    def get_all_orders(self):
        return self.order_manager.get_all_orders()
    
    def apply_product_service(self, product, quantity=1, gift_wrapping=False, express_shipping=False, insurance=False, wrapping_style="标准"):
        # 创建基础商品服务
        product_service = ConcreteProduct(product, quantity)
        
        # 应用装饰器
        if gift_wrapping:
            product_service = GiftWrappingDecorator(product_service, wrapping_style)
        
        if express_shipping:
            product_service = ExpressShippingDecorator(product_service)
        
        if insurance:
            product_service = InsuranceDecorator(product_service)
        
        return product_service

# =============== 测试代码 ===============

def test_ecommerce_system():
    # 创建电子商务系统
    system = ECommerceSystem()
    
    # 创建商品
    laptop = system.create_product("electronics", 
                                  name="MacBook Pro", 
                                  price=1299.99, 
                                  description="13-inch, 8GB RAM, 256GB SSD", 
                                  warranty_period="2 years", 
                                  brand="Apple")
    
    tshirt = system.create_product("clothing", 
                                  name="Cotton T-Shirt", 
                                  price=19.99, 
                                  description="Comfortable cotton t-shirt", 
                                  size="L", 
                                  color="Blue")
    
    book = system.create_product("book", 
                                name="Python Design Patterns", 
                                price=39.99, 
                                description="Learn design patterns in Python", 
                                author="John Smith", 
                                publisher="Tech Books")
    
    # 添加商品到库存
    system.add_product_to_inventory(laptop, 10)
    system.add_product_to_inventory(tshirt, 50)
    system.add_product_to_inventory(book, 20)
    
    print("\n=== 测试工厂模式 ===")
    print(f"电子产品: {laptop}")
    print(f"服装: {tshirt}")
    print(f"图书: {book}")
    
    # 测试购物车(单例模式)
    print("\n=== 测试单例模式 ===")
    cart1 = ShoppingCart()
    cart2 = ShoppingCart()
    print(f"cart1 和 cart2 是同一个实例: {cart1 is cart2}")
    
    # 添加商品到购物车
    system.add_to_cart(laptop, 1)
    system.add_to_cart(tshirt, 2)
    print(system.shopping_cart)
    
    # 测试观察者模式
    print("\n=== 测试观察者模式 ===")
    # 减少库存,触发低库存通知
    current_quantity = system.inventory.get_quantity(laptop.id)
    system.inventory.update_quantity(laptop.id, 3)
    
    # 减少库存至0,触发缺货通知
    system.inventory.update_quantity(book.id, 0)
    
    # 测试策略模式
    print("\n=== 测试策略模式 ===")
    credit_card = CreditCardPayment("4111-1111-1111-1111", "12/25", "123")
    paypal = PayPalPayment("customer@example.com")
    bank_transfer = BankTransferPayment("12345678", "ABCDEF")
    
    # 客户信息
    customer_info = {
        "name": "张三",
        "email": "zhangsan@example.com",
        "address": "北京市朝阳区123号",
        "phone": "13800138000"
    }
    
    # 使用信用卡结账
    order1 = system.checkout(customer_info, credit_card)
    
    # 再次添加商品到购物车
    system.add_to_cart(book, 1)  # 这应该失败,因为书已经没有库存了
    system.add_to_cart(tshirt, 1)
    
    # 使用PayPal结账
    order2 = system.checkout(customer_info, paypal)
    
    # 测试命令模式
    print("\n=== 测试命令模式 ===")
    # 取消订单
    if order1:
        system.cancel_order(order1.order_id)
    
    # 查看所有订单
    print("\n所有订单:")
    for order in system.get_all_orders():
        print(order)
    
    # 测试装饰器模式
    print("\n=== 测试装饰器模式 ===")
    # 创建带有附加服务的商品
    laptop_with_services = system.apply_product_service(
        laptop, 
        quantity=1, 
        gift_wrapping=True, 
        express_shipping=True, 
        insurance=True,
        wrapping_style="豪华"
    )
    
    print(f"商品描述: {laptop_with_services.get_description()}")
    print(f"总价: ${laptop_with_services.get_cost():.2f}")
    
    # 添加库存以便测试
    system.inventory.update_quantity(laptop.id, 5)
    
    # 添加带服务的商品到购物车
    system.add_to_cart(laptop, 1)
    
    # 使用银行转账结账
    order3 = system.checkout(customer_info, bank_transfer)
    
    print("\n最终订单列表:")
    for order in system.get_all_orders():
        print(order)

if __name__ == "__main__":
    test_ecommerce_system()