常见设计模式 Python 实现
编辑1.单例模式(Singleton Pattern)
题目描述:
实现一个数据库连接池管理器,确保系统中只有一个连接池实例,无论创建多少次都返回相同的实例。连接池应该支持获取连接和释放连接的功能。
要求:
1.实现一个 DatabaseConnectionPool
类,使用单例模式确保全局唯一
2.实现 get_connection()
和 release_connection(connection)
方法
3.连接池应有最大连接数限制
4.提供一个测试函数,验证从不同地方获取的连接池是同一个实例
代码框架:
class DatabaseConnectionPool:
# 请实现单例模式
def __init__(self, max_connections=10):
# 初始化连接池
pass
def get_connection(self):
# 获取一个数据库连接
pass
def release_connection(self, connection):
# 释放一个数据库连接回到池中
pass
# 测试代码
def test_singleton_connection_pool():
# 验证单例模式是否正确实现
pass
参考答案:
class DatabaseConnectionPool:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(DatabaseConnectionPool, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, max_connections=10):
# 只初始化一次
if not self._initialized:
self.max_connections = max_connections
self.connections = []
self.available_connections = []
self.in_use_connections = set()
self._initialized = True
# 创建初始连接
for _ in range(max_connections):
connection = self._create_connection()
self.connections.append(connection)
self.available_connections.append(connection)
def _create_connection(self):
# 模拟创建数据库连接
# 在实际应用中,这里会创建真正的数据库连接
return {"id": id({}), "connected": True}
def get_connection(self):
if not self.available_connections:
if len(self.connections) < self.max_connections:
# 如果还没达到最大连接数,可以创建新连接
connection = self._create_connection()
self.connections.append(connection)
self.available_connections.append(connection)
else:
raise Exception("No available connections in the pool")
connection = self.available_connections.pop(0)
self.in_use_connections.add(connection["id"])
return connection
def release_connection(self, connection):
if connection["id"] in self.in_use_connections:
self.in_use_connections.remove(connection["id"])
self.available_connections.append(connection)
else:
raise Exception("Connection not in use or not from this pool")
def status(self):
return {
"total": len(self.connections),
"available": len(self.available_connections),
"in_use": len(self.in_use_connections)
}
# 测试代码
def test_singleton_connection_pool():
# 创建两个连接池实例
pool1 = DatabaseConnectionPool(max_connections=5)
pool2 = DatabaseConnectionPool(max_connections=10) # 这个参数会被忽略,因为已经初始化过了
# 验证它们是同一个实例
print(f"Is pool1 the same instance as pool2? {pool1 is pool2}")
print(f"Pool max connections: {pool1.max_connections}") # 应该是5,而不是10
# 测试连接获取和释放
conn1 = pool1.get_connection()
conn2 = pool1.get_connection()
print(f"Pool status after getting 2 connections: {pool1.status()}")
pool1.release_connection(conn1)
print(f"Pool status after releasing 1 connection: {pool1.status()}")
# 验证从pool2获取的连接实际上是从同一个池中获取的
conn3 = pool2.get_connection()
print(f"Pool status after getting another connection from pool2: {pool1.status()}")
# 清理
pool2.release_connection(conn2)
pool2.release_connection(conn3)
print(f"Final pool status: {pool1.status()}")
if __name__ == "__main__":
test_singleton_connection_pool()
2.工厂模式(Factory Pattern)
题目描述:
设计一个支持多种数据格式解析的文件处理系统。系统需要能够处理 CSV、JSON、XML 和 YAML 格式的文件,并将它们转换为统一的内部数据结构。
要求:
1.设计一个 FileParser
抽象基类,定义解析文件的接口
2.为每种文件格式实现具体的解析器类
3.实现一个 ParserFactory
工厂类,根据文件扩展名创建对应的解析器
4.编写测试代码,验证工厂能够正确创建不同类型的解析器
代码框架:
from abc import ABC, abstractmethod
class FileParser(ABC):
@abstractmethod
def parse(self, file_path):
"""解析文件并返回数据"""
pass
class CSVParser(FileParser):
# 实现CSV解析
pass
class JSONParser(FileParser):
# 实现JSON解析
pass
class XMLParser(FileParser):
# 实现XML解析
pass
class YAMLParser(FileParser):
# 实现YAML解析
pass
class ParserFactory:
# 实现工厂方法,根据文件扩展名创建对应的解析器
pass
# 测试代码
def test_parser_factory():
# 测试工厂是否能正确创建不同类型的解析器
pass
参考答案:
from abc import ABC, abstractmethod
import csv
import json
import xml.etree.ElementTree as ET
import yaml
import os
class FileParser(ABC):
@abstractmethod
def parse(self, file_path):
"""解析文件并返回数据"""
pass
def validate_file(self, file_path):
"""验证文件是否存在"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
class CSVParser(FileParser):
def parse(self, file_path):
self.validate_file(file_path)
data = []
with open(file_path, 'r', newline='') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
data.append(dict(row))
return data
class JSONParser(FileParser):
def parse(self, file_path):
self.validate_file(file_path)
with open(file_path, 'r') as file:
return json.load(file)
class XMLParser(FileParser):
def parse(self, file_path):
self.validate_file(file_path)
tree = ET.parse(file_path)
root = tree.getroot()
def element_to_dict(element):
result = {}
for child in element:
if len(child) == 0:
result[child.tag] = child.text
else:
result[child.tag] = element_to_dict(child)
return result
return element_to_dict(root)
class YAMLParser(FileParser):
def parse(self, file_path):
self.validate_file(file_path)
with open(file_path, 'r') as file:
return yaml.safe_load(file)
class ParserFactory:
@staticmethod
def create_parser(file_path):
"""根据文件扩展名创建对应的解析器"""
_, extension = os.path.splitext(file_path)
extension = extension.lower()
if extension == '.csv':
return CSVParser()
elif extension == '.json':
return JSONParser()
elif extension == '.xml':
return XMLParser()
elif extension in ['.yaml', '.yml']:
return YAMLParser()
else:
raise ValueError(f"Unsupported file extension: {extension}")
# 测试代码
def test_parser_factory():
# 创建测试文件
# CSV测试文件
with open('test.csv', 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['name', 'age'])
writer.writerow(['Alice', '30'])
writer.writerow(['Bob', '25'])
# JSON测试文件
with open('test.json', 'w') as file:
json.dump([{'name': 'Alice', 'age': 30}, {'name': 'Bob', 'age': 25}], file)
# XML测试文件
with open('test.xml', 'w') as file:
file.write('<data><person><name>Alice</name><age>30</age></person><person><name>Bob</name><age>25</age></person></data>')
# YAML测试文件
with open('test.yaml', 'w') as file:
file.write("""
- name: Alice
age: 30
- name: Bob
age: 25
""")
# 测试工厂创建解析器
try:
# 测试CSV解析器
csv_parser = ParserFactory.create_parser('test.csv')
assert isinstance(csv_parser, CSVParser)
csv_data = csv_parser.parse('test.csv')
print(f"CSV Parser result: {csv_data}")
# 测试JSON解析器
json_parser = ParserFactory.create_parser('test.json')
assert isinstance(json_parser, JSONParser)
json_data = json_parser.parse('test.json')
print(f"JSON Parser result: {json_data}")
# 测试XML解析器
xml_parser = ParserFactory.create_parser('test.xml')
assert isinstance(xml_parser, XMLParser)
xml_data = xml_parser.parse('test.xml')
print(f"XML Parser result: {xml_data}")
# 测试YAML解析器
yaml_parser = ParserFactory.create_parser('test.yaml')
assert isinstance(yaml_parser, YAMLParser)
yaml_data = yaml_parser.parse('test.yaml')
print(f"YAML Parser result: {yaml_data}")
# 测试不支持的文件类型
try:
ParserFactory.create_parser('test.txt')
assert False, "Should raise ValueError for unsupported file extension"
except ValueError:
print("Successfully caught ValueError for unsupported file extension")
print("All tests passed!")
finally:
# 清理测试文件
for file_name in ['test.csv', 'test.json', 'test.xml', 'test.yaml']:
if os.path.exists(file_name):
os.remove(file_name)
if __name__ == "__main__":
test_parser_factory()
3.观察者模式(Observer Pattern)
题目描述:
实现一个股票市场监控系统,当股票价格变化时通知注册的观察者。系统需要支持多种类型的观察者,如邮件通知、短信通知和应用内通知。
要求:
1.设计一个 Subject
接口,定义注册、移除和通知观察者的方法
2.实现一个 StockMarket
类作为具体主题,管理股票价格并在价格变化时通知观察者
3.设计一个 Observer
接口,定义更新方法
4.实现至少三种不同的观察者:EmailNotifier
、SMSNotifier
和 AppNotifier
5.编写测试代码,验证当股票价格变化时,所有注册的观察者都能收到通知
代码框架:
from abc import ABC, abstractmethod
class Observer(ABC):
@abstractmethod
def update(self, stock_name, price):
"""接收股票价格更新的通知"""
pass
class Subject(ABC):
@abstractmethod
def register_observer(self, observer):
"""注册观察者"""
pass
@abstractmethod
def remove_observer(self, observer):
"""移除观察者"""
pass
@abstractmethod
def notify_observers(self):
"""通知所有观察者"""
pass
class StockMarket(Subject):
# 实现股票市场类,管理股票价格并通知观察者
pass
class EmailNotifier(Observer):
# 实现邮件通知观察者
pass
class SMSNotifier(Observer):
# 实现短信通知观察者
pass
class AppNotifier(Observer):
# 实现应用内通知观察者
pass
# 测试代码
def test_stock_market_observers():
# 测试股票价格变化时是否正确通知所有观察者
pass
参考答案:
from abc import ABC, abstractmethod
class Observer(ABC):
@abstractmethod
def update(self, stock_name, price):
"""接收股票价格更新的通知"""
pass
class Subject(ABC):
@abstractmethod
def register_observer(self, observer):
"""注册观察者"""
pass
@abstractmethod
def remove_observer(self, observer):
"""移除观察者"""
pass
@abstractmethod
def notify_observers(self):
"""通知所有观察者"""
pass
class StockMarket(Subject):
def __init__(self):
self.observers = []
self.stocks = {} # 股票名称 -> 价格
def register_observer(self, observer):
if observer not in self.observers:
self.observers.append(observer)
def remove_observer(self, observer):
if observer in self.observers:
self.observers.remove(observer)
def notify_observers(self):
for stock_name, price in self.stocks.items():
for observer in self.observers:
observer.update(stock_name, price)
def set_stock_price(self, stock_name, price):
"""设置股票价格,如果价格变化则通知观察者"""
old_price = self.stocks.get(stock_name)
if old_price != price:
self.stocks[stock_name] = price
print(f"Stock price updated: {stock_name} = {price}")
self.notify_observers()
def get_stock_price(self, stock_name):
"""获取股票价格"""
return self.stocks.get(stock_name)
class EmailNotifier(Observer):
def __init__(self, email_address):
self.email_address = email_address
def update(self, stock_name, price):
# 在实际应用中,这里会发送真正的邮件
print(f"Sending email to {self.email_address}: {stock_name} stock price updated to {price}")
class SMSNotifier(Observer):
def __init__(self, phone_number):
self.phone_number = phone_number
def update(self, stock_name, price):
# 在实际应用中,这里会发送真正的短信
print(f"Sending SMS to {self.phone_number}: {stock_name} stock price updated to {price}")
class AppNotifier(Observer):
def __init__(self, user_id):
self.user_id = user_id
def update(self, stock_name, price):
# 在实际应用中,这里会发送应用内推送
print(f"Sending in-app notification to user {self.user_id}: {stock_name} stock price updated to {price}")
# 测试代码
def test_stock_market_observers():
# 创建股票市场
market = StockMarket()
# 创建观察者
email_notifier = EmailNotifier("user@example.com")
sms_notifier = SMSNotifier("+1234567890")
app_notifier = AppNotifier("user123")
# 注册观察者
market.register_observer(email_notifier)
market.register_observer(sms_notifier)
market.register_observer(app_notifier)
# 设置股票价格,应该触发通知
print("\nSetting initial stock prices:")
market.set_stock_price("AAPL", 150.0)
market.set_stock_price("GOOGL", 2800.0)
# 更新股票价格,应该触发通知
print("\nUpdating stock prices:")
market.set_stock_price("AAPL", 152.5)
# 移除一个观察者
print("\nRemoving SMS notifier:")
market.remove_observer(sms_notifier)
# 再次更新股票价格,只有剩余的观察者应该收到通知
print("\nUpdating stock prices after removing an observer:")
market.set_stock_price("GOOGL", 2850.0)
# 设置相同的价格,不应该触发通知
print("\nSetting the same price(should not trigger notifications):")
market.set_stock_price("AAPL", 152.5)
if __name__ == "__main__":
test_stock_market_observers()
4.策略模式(Strategy Pattern)
题目描述:
设计一个文本处理系统,支持多种不同的文本压缩算法。系统应该能够在运行时动态选择不同的压缩策略,如 Gzip、Bzip2、LZMA 等。
要求:
1.设计一个 CompressionStrategy
接口,定义压缩和解压方法
2.实现至少三种不同的压缩策略:GzipCompression
、Bzip2Compression
和 LZMACompression
3.实现一个 TextProcessor
类,使用策略模式处理文本压缩
4.编写测试代码,验证系统能够使用不同的压缩策略处理文本
代码框架:
from abc import ABC, abstractmethod
class CompressionStrategy(ABC):
@abstractmethod
def compress(self, data):
"""压缩数据"""
pass
@abstractmethod
def decompress(self, data):
"""解压数据"""
pass
class GzipCompression(CompressionStrategy):
# 实现Gzip压缩策略
pass
class Bzip2Compression(CompressionStrategy):
# 实现Bzip2压缩策略
pass
class LZMACompression(CompressionStrategy):
# 实现LZMA压缩策略
pass
class TextProcessor:
# 实现文本处理器,使用策略模式处理文本压缩
pass
# 测试代码
def test_compression_strategies():
# 测试不同的压缩策略
pass
参考答案:
from abc import ABC, abstractmethod
import gzip
import bz2
import lzma
import os
class CompressionStrategy(ABC):
@abstractmethod
def compress(self, data):
"""压缩数据"""
pass
@abstractmethod
def decompress(self, data):
"""解压数据"""
pass
class GzipCompression(CompressionStrategy):
def compress(self, data):
if isinstance(data, str):
data = data.encode('utf-8')
return gzip.compress(data)
def decompress(self, data):
decompressed_data = gzip.decompress(data)
try:
# 尝试将字节转换为字符串
return decompressed_data.decode('utf-8')
except UnicodeDecodeError:
# 如果无法解码,则返回原始字节
return decompressed_data
class Bzip2Compression(CompressionStrategy):
def compress(self, data):
if isinstance(data, str):
data = data.encode('utf-8')
return bz2.compress(data)
def decompress(self, data):
decompressed_data = bz2.decompress(data)
try:
return decompressed_data.decode('utf-8')
except UnicodeDecodeError:
return decompressed_data
class LZMACompression(CompressionStrategy):
def compress(self, data):
if isinstance(data, str):
data = data.encode('utf-8')
return lzma.compress(data)
def decompress(self, data):
decompressed_data = lzma.decompress(data)
try:
return decompressed_data.decode('utf-8')
except UnicodeDecodeError:
return decompressed_data
class TextProcessor:
def __init__(self, compression_strategy=None):
self.compression_strategy = compression_strategy
def set_compression_strategy(self, compression_strategy):
"""设置压缩策略"""
self.compression_strategy = compression_strategy
def compress_text(self, text):
"""使用当前策略压缩文本"""
if not self.compression_strategy:
raise ValueError("Compression strategy not set")
return self.compression_strategy.compress(text)
def decompress_text(self, compressed_data):
"""使用当前策略解压文本"""
if not self.compression_strategy:
raise ValueError("Compression strategy not set")
return self.compression_strategy.decompress(compressed_data)
def compress_file(self, input_file_path, output_file_path):
"""压缩文件"""
if not self.compression_strategy:
raise ValueError("Compression strategy not set")
with open(input_file_path, 'rb') as input_file:
data = input_file.read()
compressed_data = self.compression_strategy.compress(data)
with open(output_file_path, 'wb') as output_file:
output_file.write(compressed_data)
def decompress_file(self, input_file_path, output_file_path):
"""解压文件"""
if not self.compression_strategy:
raise ValueError("Compression strategy not set")
with open(input_file_path, 'rb') as input_file:
compressed_data = input_file.read()
decompressed_data = self.compression_strategy.decompress(compressed_data)
# 根据解压结果类型决定写入模式
mode = 'w' if isinstance(decompressed_data, str) else 'wb'
with open(output_file_path, mode) as output_file:
output_file.write(decompressed_data)
# 测试代码
def test_compression_strategies():
# 创建测试文本
test_text = "This is a test text that will be compressed using different strategies." * 100
# 创建测试文件
with open('test_input.txt', 'w') as file:
file.write(test_text)
# 创建压缩策略
gzip_strategy = GzipCompression()
bzip2_strategy = Bzip2Compression()
lzma_strategy = LZMACompression()
# 创建文本处理器
processor = TextProcessor()
# 测试Gzip压缩
processor.set_compression_strategy(gzip_strategy)
gzip_compressed = processor.compress_text(test_text)
gzip_decompressed = processor.decompress_text(gzip_compressed)
processor.compress_file('test_input.txt', 'test_gzip.gz')
processor.decompress_file('test_gzip.gz', 'test_gzip_decompressed.txt')
# 测试 Bzip2 压缩
processor.set_compression_strategy(bzip2_strategy)
bzip2_compressed = processor.compress_text(test_text)
bzip2_decompressed = processor.decompress_text(bzip2_compressed)
processor.compress_file('test_input.txt', 'test_bzip2.bz2')
processor.decompress_file('test_bzip2.bz2', 'test_bzip2_decompressed.txt')
# 测试 LZMA 压缩
processor.set_compression_strategy(lzma_strategy)
lzma_compressed = processor.compress_text(test_text)
lzma_decompressed = processor.decompress_text(lzma_compressed)
processor.compress_file('test_input.txt', 'test_lzma.xz')
processor.decompress_file('test_lzma.xz', 'test_lzma_decompressed.txt')
# 打印压缩结果
print(f"Original text size: {len(test_text)} bytes")
print(f"Gzip compressed size: {len(gzip_compressed)} bytes")
print(f"Bzip2 compressed size: {len(bzip2_compressed)} bytes")
print(f"LZMA compressed size: {len(lzma_compressed)} bytes")
# 验证解压结果
assert gzip_decompressed == test_text
assert bzip2_decompressed == test_text
assert lzma_decompressed == test_text
print("All decompression tests passed!")
# 验证文件压缩和解压
with open('test_gzip_decompressed.txt', 'r') as file:
assert file.read() == test_text
with open('test_bzip2_decompressed.txt', 'r') as file:
assert file.read() == test_text
with open('test_lzma_decompressed.txt', 'r') as file:
assert file.read() == test_text
print("All file compression/decompression tests passed!")
# 清理测试文件
for file_name in ['test_input.txt', 'test_gzip.gz', 'test_gzip_decompressed.txt',
'test_bzip2.bz2', 'test_bzip2_decompressed.txt',
'test_lzma.xz', 'test_lzma_decompressed.txt']:
if os.path.exists(file_name):
os.remove(file_name)
if __name__ == "__main__":
test_compression_strategies()
5.装饰器模式(Decorator Pattern)
题目描述:
设计一个咖啡订购系统,允许顾客选择基础咖啡类型(如美式咖啡、拿铁等),然后添加各种配料(如牛奶、糖、巧克力等)。系统需要能够动态计算最终价格和描述。
要求:
1.设计一个Coffee
抽象基类,定义获取价格和描述的方法
2.实现几种基础咖啡类型,如Espresso
、Americano
和Latte
3.设计一个CoffeeDecorator
抽象装饰器类
4.实现几种配料装饰器,如MilkDecorator
、SugarDecorator
和ChocolateDecorator
5.编写测试代码,验证系统能够正确计算添加不同配料的咖啡价格和描述
代码框架:
from abc import ABC, abstractmethod
class Coffee(ABC):
@abstractmethod
def get_cost(self):
"""获取咖啡价格"""
pass
@abstractmethod
def get_description(self):
"""获取咖啡描述"""
pass
class Espresso(Coffee):
# 实现浓缩咖啡
pass
class Americano(Coffee):
# 实现美式咖啡
pass
class Latte(Coffee):
# 实现拿铁咖啡
pass
class CoffeeDecorator(Coffee):
# 实现咖啡装饰器抽象类
pass
class MilkDecorator(CoffeeDecorator):
# 实现牛奶装饰器
pass
class SugarDecorator(CoffeeDecorator):
# 实现糖装饰器
pass
class ChocolateDecorator(CoffeeDecorator):
# 实现巧克力
参考答案:
from abc import ABC, abstractmethod
class Coffee(ABC):
@abstractmethod
def get_cost(self):
"""获取咖啡价格"""
pass
@abstractmethod
def get_description(self):
"""获取咖啡描述"""
pass
class Espresso(Coffee):
def get_cost(self):
return 1.99
def get_description(self):
return "Espresso"
class Americano(Coffee):
def get_cost(self):
return 2.49
def get_description(self):
return "Americano"
class Latte(Coffee):
def get_cost(self):
return 2.99
def get_description(self):
return "Latte"
class CoffeeDecorator(Coffee):
def __init__(self, coffee):
self.coffee = coffee
def get_cost(self):
return self.coffee.get_cost()
def get_description(self):
return self.coffee.get_description()
class MilkDecorator(CoffeeDecorator):
def get_cost(self):
return self.coffee.get_cost() + 0.50
def get_description(self):
return f"{self.coffee.get_description()}, Milk"
class SugarDecorator(CoffeeDecorator):
def get_cost(self):
return self.coffee.get_cost() + 0.25
def get_description(self):
return f"{self.coffee.get_description()}, Sugar"
class ChocolateDecorator(CoffeeDecorator):
def get_cost(self):
return self.coffee.get_cost() + 0.75
def get_description(self):
return f"{self.coffee.get_description()}, Chocolate"
class WhippedCreamDecorator(CoffeeDecorator):
def get_cost(self):
return self.coffee.get_cost() + 0.60
def get_description(self):
return f"{self.coffee.get_description()}, Whipped Cream"
# 测试代码
def test_coffee_decorators():
# 创建基础咖啡
espresso = Espresso()
americano = Americano()
latte = Latte()
# 测试基础咖啡
print(f"{espresso.get_description()}: ${espresso.get_cost():.2f}")
print(f"{americano.get_description()}: ${americano.get_cost():.2f}")
print(f"{latte.get_description()}: ${latte.get_cost():.2f}")
# 添加配料
espresso_with_milk = MilkDecorator(espresso)
print(f"{espresso_with_milk.get_description()}: ${espresso_with_milk.get_cost():.2f}")
americano_with_sugar = SugarDecorator(americano)
print(f"{americano_with_sugar.get_description()}: ${americano_with_sugar.get_cost():.2f}")
# 多重装饰
special_latte = ChocolateDecorator(WhippedCreamDecorator(MilkDecorator(latte)))
print(f"{special_latte.get_description()}: ${special_latte.get_cost():.2f}")
# 创建自定义咖啡
custom_coffee = SugarDecorator(MilkDecorator(ChocolateDecorator(Espresso())))
print(f"Custom coffee: {custom_coffee.get_description()}: ${custom_coffee.get_cost():.2f}")
if __name__ == "__main__":
test_coffee_decorators()
6.适配器模式(Adapter Pattern)
题目描述:
你正在开发一个支持多种支付方式的电子商务系统。系统已经有一个标准的支付接口,但现在需要集成一个第三方支付 API,其接口与系统现有接口不兼容。请使用适配器模式解决这个问题。
要求:
1.系统有一个现有的 PaymentProcessor
接口,定义了 process_payment
方法
2.已经实现了几种支付处理器,如 CreditCardProcessor
和 PayPalProcessor
3.第三方支付 API 有一个 ThirdPartyPaymentAPI
类,提供了 authorize_payment
和 capture_payment
方法
4.实现一个适配器 ThirdPartyPaymentAdapter
,使第三方 API 符合系统的支付接口
5.编写测试代码,验证适配器能够正确工作
代码框架:
from abc import ABC, abstractmethod
class PaymentProcessor(ABC):
@abstractmethod
def process_payment(self, amount):
"""处理支付"""
pass
class CreditCardProcessor(PaymentProcessor):
# 实现信用卡支付处理器
pass
class PayPalProcessor(PaymentProcessor):
# 实现PayPal支付处理器
pass
class ThirdPartyPaymentAPI:
# 第三方支付API,接口与系统不兼容
def authorize_payment(self, amount):
# 授权支付
pass
def capture_payment(self, authorization_id):
# 捕获支付
pass
class ThirdPartyPaymentAdapter(PaymentProcessor):
# 实现适配器,使第三方API符合系统接口
pass
# 测试代码
def test_payment_adapter():
# 测试适配器是否能正确处理支付
pass
参考答案:
from abc import ABC, abstractmethod
import uuid
class PaymentProcessor(ABC):
@abstractmethod
def process_payment(self, amount):
"""处理支付"""
pass
class CreditCardProcessor(PaymentProcessor):
def __init__(self, card_number, expiry_date, cvv):
self.card_number = card_number
self.expiry_date = expiry_date
self.cvv = cvv
def process_payment(self, amount):
# 在实际应用中,这里会调用信用卡处理API
print(f"Processing credit card payment of ${amount:.2f}")
print(f"Card details: {self.card_number[-4:]}(expires: {self.expiry_date})")
return {"success": True, "transaction_id": str(uuid.uuid4())}
class PayPalProcessor(PaymentProcessor):
def __init__(self, email):
self.email = email
def process_payment(self, amount):
# 在实际应用中,这里会调用PayPal API
print(f"Processing PayPal payment of ${amount:.2f}")
print(f"PayPal account: {self.email}")
return {"success": True, "transaction_id": str(uuid.uuid4())}
class ThirdPartyPaymentAPI:
def __init__(self, api_key):
self.api_key = api_key
def authorize_payment(self, amount):
# 在实际应用中,这里会调用第三方API进行授权
print(f"Third-party API: Authorizing payment of ${amount:.2f}")
print(f"Using API key: {self.api_key[:5]}...")
# 返回授权ID
return {"authorization_id": str(uuid.uuid4()), "status": "authorized"}
def capture_payment(self, authorization_id):
# 在实际应用中,这里会调用第三方API捕获已授权的支付
print(f"Third-party API: Capturing payment with authorization ID: {authorization_id}")
# 返回交易ID
return {"transaction_id": str(uuid.uuid4()), "status": "completed"}
class ThirdPartyPaymentAdapter(PaymentProcessor):
def __init__(self, third_party_api):
self.third_party_api = third_party_api
def process_payment(self, amount):
# 使用适配器模式将第三方API的两步流程适配为系统的单步流程
# 1.首先授权支付
auth_result = self.third_party_api.authorize_payment(amount)
if auth_result["status"] != "authorized":
return {"success": False, "error": "Payment authorization failed"}
# 2.然后捕获支付
capture_result = self.third_party_api.capture_payment(auth_result["authorization_id"])
if capture_result["status"] != "completed":
return {"success": False, "error": "Payment capture failed"}
# 3.返回符合系统接口的结果
return {"success": True, "transaction_id": capture_result["transaction_id"]}
# 测试代码
def test_payment_adapter():
# 创建支付处理器
credit_card = CreditCardProcessor("4111-1111-1111-1111", "12/25", "123")
paypal = PayPalProcessor("customer@example.com")
# 创建第三方API和适配器
third_party_api = ThirdPartyPaymentAPI("api_key_12345abcde")
third_party_adapter = ThirdPartyPaymentAdapter(third_party_api)
# 测试信用卡支付
print("\n=== Testing Credit Card Payment ===")
cc_result = credit_card.process_payment(100.00)
print(f"Result: {cc_result}")
# 测试PayPal支付
print("\n=== Testing PayPal Payment ===")
paypal_result = paypal.process_payment(75.50)
print(f"Result: {paypal_result}")
# 测试第三方API适配器
print("\n=== Testing Third-party Payment Adapter ===")
adapter_result = third_party_adapter.process_payment(120.75)
print(f"Result: {adapter_result}")
# 验证所有支付处理器都实现了相同的接口
processors = [credit_card, paypal, third_party_adapter]
print("\n=== Verifying Common Interface ===")
for i, processor in enumerate(processors, 1):
print(f"Processor {i} is a PaymentProcessor: {isinstance(processor, PaymentProcessor)}")
result = processor.process_payment(50.00)
print(f"Processor {i} result has 'success' key: {'success' in result}")
print(f"Processor {i} result has 'transaction_id' key: {'transaction_id' in result}")
if __name__ == "__main__":
test_payment_adapter()
7.命令模式(Command Pattern)
题目描述:
设计一个简单的文本编辑器,支持多种操作(如插入文本、删除文本、复制文本等),并能够实现撤销和重做功能。
要求:
1.设计一个 Command
接口,定义执行和撤销方法
2.实现几种具体命令,如 InsertTextCommand
、DeleteTextCommand
和 CopyTextCommand
3.实现一个 TextEditor
类作为接收者,提供基本的文本操作
4.实现一个 EditorInvoker
类,管理命令的执行、撤销和重做
5.编写测试代码,验证编辑器能够正确执行各种操作,并支持撤销和重做
代码框架:
from abc import ABC, abstractmethod
class Command(ABC):
@abstractmethod
def execute(self):
"""执行命令"""
pass
@abstractmethod
def undo(self):
"""撤销命令"""
pass
class TextEditor:
# 实现文本编辑器,提供基本的文本操作
pass
class InsertTextCommand(Command):
# 实现插入文本命令
pass
class DeleteTextCommand(Command):
# 实现删除文本命令
pass
class CopyTextCommand(Command):
# 实现复制文本命令
pass
class EditorInvoker:
# 实现编辑器调用者,管理命令的执行、撤销和重做
pass
# 测试代码
def test_text_editor_commands():
# 测试编辑器的各种操作和撤销/重做功能
pass
参考答案:
from abc import ABC, abstractmethod
class Command(ABC):
@abstractmethod
def execute(self):
"""执行命令"""
pass
@abstractmethod
def undo(self):
"""撤销命令"""
pass
class TextEditor:
def __init__(self):
self.text = ""
self.clipboard = ""
self.cursor_position = 0
def insert_text(self, position, text):
"""在指定位置插入文本"""
if position < 0 or position > len(self.text):
raise ValueError("Invalid position")
self.text = self.text[:position] + text + self.text[position:]
self.cursor_position = position + len(text)
return text
def delete_text(self, start, end):
"""删除指定范围的文本"""
if start < 0 or end > len(self.text) or start > end:
raise ValueError("Invalid range")
deleted_text = self.text[start:end]
self.text = self.text[:start] + self.text[end:]
self.cursor_position = start
return deleted_text
def copy_text(self, start, end):
"""复制指定范围的文本到剪贴板"""
if start < 0 or end > len(self.text) or start > end:
raise ValueError("Invalid range")
self.clipboard = self.text[start:end]
self.cursor_position = end
return self.clipboard
def paste_text(self, position):
"""在指定位置粘贴剪贴板内容"""
if position < 0 or position > len(self.text):
raise ValueError("Invalid position")
return self.insert_text(position, self.clipboard)
def get_text(self):
"""获取当前文本内容"""
return self.text
def get_cursor_position(self):
"""获取当前光标位置"""
return self.cursor_position
def get_clipboard(self):
"""获取剪贴板内容"""
return self.clipboard
class InsertTextCommand(Command):
def __init__(self, editor, position, text):
self.editor = editor
self.position = position
self.text = text
self.inserted_text = None
def execute(self):
self.inserted_text = self.editor.insert_text(self.position, self.text)
return True
def undo(self):
if self.inserted_text:
self.editor.delete_text(self.position, self.position + len(self.inserted_text))
return True
return False
class DeleteTextCommand(Command):
def __init__(self, editor, start, end):
self.editor = editor
self.start = start
self.end = end
self.deleted_text = None
def execute(self):
self.deleted_text = self.editor.delete_text(self.start, self.end)
return True
def undo(self):
if self.deleted_text:
self.editor.insert_text(self.start, self.deleted_text)
return True
return False
class CopyTextCommand(Command):
def __init__(self, editor, start, end):
self.editor = editor
self.start = start
self.end = end
self.previous_clipboard = None
def execute(self):
self.previous_clipboard = self.editor.get_clipboard()
self.editor.copy_text(self.start, self.end)
return True
def undo(self):
# 恢复之前的剪贴板内容
if self.previous_clipboard is not None:
self.editor.clipboard = self.previous_clipboard
return True
return False
class PasteTextCommand(Command):
def __init__(self, editor, position):
self.editor = editor
self.position = position
self.pasted_text = None
def execute(self):
self.pasted_text = self.editor.paste_text(self.position)
return True
def undo(self):
if self.pasted_text:
self.editor.delete_text(self.position, self.position + len(self.pasted_text))
return True
return False
class EditorInvoker:
def __init__(self):
self.command_history = []
self.redo_stack = []
def execute_command(self, command):
"""执行命令并添加到历史记录"""
if command.execute():
self.command_history.append(command)
self.redo_stack.clear() # 执行新命令后清空重做栈
return True
return False
def undo(self):
"""撤销最后一个命令"""
if self.command_history:
command = self.command_history.pop()
if command.undo():
self.redo_stack.append(command)
return True
return False
def redo(self):
"""重做最后一个撤销的命令"""
if self.redo_stack:
command = self.redo_stack.pop()
if command.execute():
self.command_history.append(command)
return True
return False
def get_history_size(self):
"""获取历史记录大小"""
return len(self.command_history)
def get_redo_stack_size(self):
"""获取重做栈大小"""
return len(self.redo_stack)
# 测试代码
def test_text_editor_commands():
# 创建编辑器和调用者
editor = TextEditor()
invoker = EditorInvoker()
# 测试插入命令
print("=== Testing Insert Command ===")
insert_cmd = InsertTextCommand(editor, 0, "Hello, ")
invoker.execute_command(insert_cmd)
print(f"Text after insert: '{editor.get_text()}'")
insert_cmd2 = InsertTextCommand(editor, 7, "world!")
invoker.execute_command(insert_cmd2)
print(f"Text after second insert: '{editor.get_text()}'")
# 测试复制命令
print("\n=== Testing Copy Command ===")
copy_cmd = CopyTextCommand(editor, 7, 12)
invoker.execute_command(copy_cmd)
print(f"Clipboard after copy: '{editor.get_clipboard()}'")
# 测试粘贴命令
print("\n=== Testing Paste Command ===")
paste_cmd = PasteTextCommand(editor, 13)
invoker.execute_command(paste_cmd)
print(f"Text after paste: '{editor.get_text()}'")
# 测试删除命令
print("\n=== Testing Delete Command ===")
delete_cmd = DeleteTextCommand(editor, 7, 12)
invoker.execute_command(delete_cmd)
print(f"Text after delete: '{editor.get_text()}'")
# 测试撤销
print("\n=== Testing Undo ===")
invoker.undo() # 撤销删除
print(f"Text after undo delete: '{editor.get_text()}'")
invoker.undo() # 撤销粘贴
print(f"Text after undo paste: '{editor.get_text()}'")
# 测试重做
print("\n=== Testing Redo ===")
invoker.redo() # 重做粘贴
print(f"Text after redo paste: '{editor.get_text()}'")
# 测试多次撤销和重做
print("\n=== Testing Multiple Undo/Redo ===")
invoker.undo() # 撤销粘贴
invoker.undo() # 撤销复制
invoker.undo() # 撤销第二次插入
print(f"Text after multiple undos: '{editor.get_text()}'")
invoker.redo() # 重做第二次插入
print(f"Text after redo insert: '{editor.get_text()}'")
# 测试在撤销后执行新命令
print("\n=== Testing New Command After Undo ===")
insert_cmd3 = InsertTextCommand(editor, 7, "Python!")
invoker.execute_command(insert_cmd3)
print(f"Text after new insert: '{editor.get_text()}'")
print(f"Redo stack size(should be 0): {invoker.get_redo_stack_size()}")
if __name__ == "__main__":
test_text_editor_commands()
8.组合模式(Composite Pattern)
题目描述:
设计一个文件系统结构,支持文件和目录的层次结构。系统需要能够计算目录大小(包括所有子文件和子目录的大小总和),并支持搜索文件。
要求:
1.设计一个 FileSystemComponent
抽象基类,定义获取名称、大小和搜索方法
2.实现 File
类作为叶节点,表示单个文件
3.实现 Directory
类作为组合节点,可以包含文件和子目录
4.目录的大小应该是其包含的所有文件和子目录大小的总和
5.实现搜索功能,能够在整个文件系统中查找指定名称的文件
6.编写测试代码,验证系统能够正确计算目录大小和搜索文件
代码框架:
from abc import ABC, abstractmethod
class FileSystemComponent(ABC):
@abstractmethod
def get_name(self):
"""获取名称"""
pass
@abstractmethod
def get_size(self):
"""获取大小"""
pass
@abstractmethod
def search(self, name):
"""搜索文件"""
pass
class File(FileSystemComponent):
# 实现文件类
pass
class Directory(FileSystemComponent):
# 实现目录类
pass
# 测试代码
def test_file_system():
# 测试文件系统的目录大小计算和文件搜索功能
pass
参考答案:
from abc import ABC, abstractmethod
class FileSystemComponent(ABC):
def __init__(self, name):
self.name = name
def get_name(self):
"""获取名称"""
return self.name
@abstractmethod
def get_size(self):
"""获取大小"""
pass
@abstractmethod
def search(self, name):
"""搜索文件"""
pass
@abstractmethod
def print_structure(self, indent=""):
"""打印结构"""
pass
class File(FileSystemComponent):
def __init__(self, name, size):
super().__init__(name)
self.size = size
def get_size(self):
return self.size
def search(self, name):
"""搜索文件,如果名称匹配则返回自身,否则返回空列表"""
if name in self.name:
return [self]
return []
def print_structure(self, indent=""):
"""打印文件结构"""
print(f"{indent}- {self.name}({self.size} bytes)")
class Directory(FileSystemComponent):
def __init__(self, name):
super().__init__(name)
self.children = []
def add(self, component):
"""添加子组件(文件或目录)"""
self.children.append(component)
def remove(self, component):
"""移除子组件"""
self.children.remove(component)
def get_size(self):
"""计算目录大小(所有子组件大小的总和)"""
total_size = 0
for child in self.children:
total_size += child.get_size()
return total_size
def search(self, name):
"""搜索文件,返回所有匹配的文件列表"""
result = []
# 如果目录名称匹配,也将其添加到结果中
if name in self.name:
result.append(self)
# 在所有子组件中搜索
for child in self.children:
result.extend(child.search(name))
return result
def print_structure(self, indent=""):
"""打印目录结构"""
print(f"{indent}+ {self.name}({self.get_size()} bytes)")
for child in self.children:
child.print_structure(indent + " ")
# 测试代码
def test_file_system():
# 创建根目录
root = Directory("root")
# 创建子目录
documents = Directory("documents")
pictures = Directory("pictures")
# 添加子目录到根目录
root.add(documents)
root.add(pictures)
# 创建文件并添加到目录
file1 = File("report.docx", 2048)
file2 = File("presentation.pptx", 4096)
documents.add(file1)
documents.add(file2)
file3 = File("vacation.jpg", 3072)
file4 = File("family.jpg", 2560)
pictures.add(file3)
pictures.add(file4)
# 创建子目录并添加文件
work = Directory("work")
documents.add(work)
file5 = File("project_plan.docx", 1536)
work.add(file5)
# 打印文件系统结构
print("=== File System Structure ===")
root.print_structure()
# 测试大小计算
print("\n=== Size Calculations ===")
print(f"Size of root: {root.get_size()} bytes")
print(f"Size of documents: {documents.get_size()} bytes")
print(f"Size of pictures: {pictures.get_size()} bytes")
print(f"Size of work: {work.get_size()} bytes")
# 测试文件搜索
print("\n=== File Search ===")
search_term = "docx"
results = root.search(search_term)
print(f"Search results for '{search_term}': ")
for item in results:
print(f"- {item.get_name()}({item.get_size()} bytes)")
search_term = "jpg"
results = root.search(search_term)
print(f"\nSearch results for '{search_term}': ")
for item in results:
print(f"- {item.get_name()}({item.get_size()} bytes)")
search_term = "work"
results = root.search(search_term)
print(f"\nSearch results for '{search_term}': ")
for item in results:
print(f"- {item.get_name()}({item.get_size()} bytes)")
# 测试移除组件
print("\n=== Removing Components ===")
documents.remove(file1)
print("Removed report.docx from documents")
root.print_structure()
if __name__ == "__main__":
test_file_system()
9.状态模式(State Pattern)
题目描述:
设计一个自动售货机系统,根据不同的状态(如无货、有货、已投币、已选择商品等)展示不同的行为。
要求:
1.设计一个 VendingMachineState
接口,定义各种操作方法
2.实现几种具体状态类,如 NoItemState
、HasItemState
、HasCoinState
和 DispenseState
3.实现一个 VendingMachine
类,根据当前状态执行不同的行为
4.状态应该能够自动转换,如投币后从 HasItemState
转换到 HasCoinState
5.编写测试代码,验证售货机在不同状态下的行为是否正确
代码框架:
from abc import ABC, abstractmethod
class VendingMachineState(ABC):
@abstractmethod
def insert_coin(self, machine):
"""投入硬币"""
pass
@abstractmethod
def select_item(self, machine, item_code):
"""选择商品"""
pass
@abstractmethod
def dispense_item(self, machine):
"""发放商品"""
pass
@abstractmethod
def return_coin(self, machine):
"""退回硬币"""
pass
class NoItemState(VendingMachineState):
# 实现无货状态
pass
class HasItemState(VendingMachineState):
# 实现有货状态
pass
class HasCoinState(VendingMachineState):
# 实现已投币状态
pass
class DispenseState(VendingMachineState):
# 实现发放商品状态
pass
class VendingMachine:
# 实现自动售货机,根据当前状态执行不同行为
pass
# 测试代码
def test_vending_machine():
# 测试售货机在不同状态下的行为
pass
参考答案:
from abc import ABC, abstractmethod
class VendingMachineState(ABC):
@abstractmethod
def insert_coin(self, machine):
"""投入硬币"""
pass
@abstractmethod
def select_item(self, machine, item_code):
"""选择商品"""
pass
@abstractmethod
def dispense_item(self, machine):
"""发放商品"""
pass
@abstractmethod
def return_coin(self, machine):
"""退回硬币"""
pass
class NoItemState(VendingMachineState):
def insert_coin(self, machine):
print("无法接受硬币,售货机内没有商品")
return False
def select_item(self, machine, item_code):
print("无法选择商品,售货机内没有商品")
return False
def dispense_item(self, machine):
print("无法发放商品,售货机内没有商品")
return False
def return_coin(self, machine):
print("没有硬币可退回")
return False
def __str__(self):
return "无货状态"
class HasItemState(VendingMachineState):
def insert_coin(self, machine):
print("已接受硬币")
machine.set_state(machine.has_coin_state)
return True
def select_item(self, machine, item_code):
print("请先投入硬币")
return False
def dispense_item(self, machine):
print("请先投入硬币并选择商品")
return False
def return_coin(self, machine):
print("没有硬币可退回")
return False
def __str__(self):
return "有货状态"
class HasCoinState(VendingMachineState):
def insert_coin(self, machine):
print("已经投入了硬币,请选择商品或退回硬币")
return False
def select_item(self, machine, item_code):
if item_code in machine.items and machine.items[item_code]["count"] > 0:
machine.selected_item = item_code
print(f"已选择商品: {machine.items[item_code]['name']}")
machine.set_state(machine.dispense_state)
return True
else:
print(f"商品编号 {item_code} 无效或已售罄")
return False
def dispense_item(self, machine):
print("请先选择商品")
return False
def return_coin(self, machine):
print("退回硬币")
machine.set_state(machine.has_item_state)
return True
def __str__(self):
return "已投币状态"
class DispenseState(VendingMachineState):
def insert_coin(self, machine):
print("正在发放商品,请稍等")
return False
def select_item(self, machine, item_code):
print("正在发放商品,请稍等")
return False
def dispense_item(self, machine):
if machine.selected_item and machine.selected_item in machine.items:
item = machine.items[machine.selected_item]
if item["count"] > 0:
item["count"] -= 1
print(f"发放商品: {item['name']}")
# 检查是否还有商品
if machine.get_total_items() > 0:
machine.set_state(machine.has_item_state)
else:
machine.set_state(machine.no_item_state)
machine.selected_item = None
return True
print("发放商品失败")
machine.set_state(machine.has_item_state)
return False
def return_coin(self, machine):
print("正在发放商品,无法退回硬币")
return False
def __str__(self):
return "发放商品状态"
class VendingMachine:
def __init__(self):
# 初始化状态
self.no_item_state = NoItemState()
self.has_item_state = HasItemState()
self.has_coin_state = HasCoinState()
self.dispense_state = DispenseState()
# 默认为无货状态
self.state = self.no_item_state
# 商品信息
self.items = {}
self.selected_item = None
def set_state(self, state):
"""设置当前状态"""
self.state = state
print(f"售货机状态变更为: {state}")
def add_item(self, code, name, price, count):
"""添加商品"""
self.items[code] = {
"name": name,
"price": price,
"count": count
}
print(f"添加商品: {name}, 价格: {price}, 数量: {count}")
# 如果从无货变为有货,更新状态
if self.state == self.no_item_state and self.get_total_items() > 0:
self.set_state(self.has_item_state)
def get_total_items(self):
"""获取商品总数"""
return sum(item["count"] for item in self.items.values())
def insert_coin(self):
"""投入硬币"""
return self.state.insert_coin(self)
def select_item(self, item_code):
"""选择商品"""
return self.state.select_item(self, item_code)
def dispense_item(self):
"""发放商品"""
return self.state.dispense_item(self)
def return_coin(self):
"""退回硬币"""
return self.state.return_coin(self)
def get_current_state(self):
"""获取当前状态"""
return str(self.state)
# 测试代码
def test_vending_machine():
# 创建售货机
machine = VendingMachine()
# 测试初始状态(无货)
print("\n=== 测试初始状态(无货)===")
print(f"当前状态: {machine.get_current_state()}")
machine.insert_coin() # 应该失败
machine.select_item("A1") # 应该失败
machine.dispense_item() # 应该失败
machine.return_coin() # 应该失败
# 添加商品
print("\n=== 添加商品 ===")
machine.add_item("A1", "可乐", 2.5, 5)
machine.add_item("A2", "薯片", 3.0, 3)
machine.add_item("B1", "巧克力", 4.0, 2)
print(f"当前状态: {machine.get_current_state()}")
# 测试有货状态
print("\n=== 测试有货状态 ===")
machine.select_item("A1") # 应该失败,需要先投币
machine.dispense_item() # 应该失败,需要先投币和选择商品
# 投入硬币
print("\n=== 投入硬币 ===")
machine.insert_coin() # 应该成功
print(f"当前状态: {machine.get_current_state()}")
# 测试已投币状态
print("\n=== 测试已投币状态 ===")
machine.insert_coin() # 应该失败,已经投入了硬币
# 选择商品
print("\n=== 选择商品 ===")
machine.select_item("A1") # 应该成功
print(f"当前状态: {machine.get_current_state()}")
# 发放商品
print("\n=== 发放商品 ===")
machine.dispense_item() # 应该成功
print(f"当前状态: {machine.get_current_state()}")
# 再次购买
print("\n=== 再次购买 ===")
machine.insert_coin() # 应该成功
machine.select_item("A2") # 应该成功
machine.dispense_item() # 应该成功
# 测试退币
print("\n=== 测试退币 ===")
machine.insert_coin() # 应该成功
print(f"当前状态: {machine.get_current_state()}")
machine.return_coin() # 应该成功
print(f"当前状态: {machine.get_current_state()}")
# 测试商品售罄
print("\n=== 测试商品售罄 ===")
# 购买所有剩余商品
for _ in range(4): # 可乐还剩3个,薯片还剩2个,巧克力还剩2个,总共7个
machine.insert_coin()
if machine.get_current_state() == "已投币状态":
# 尝试购买可乐
if machine.select_item("A1"):
machine.dispense_item()
# 如果可乐售罄,尝试购买薯片
elif machine.select_item("A2"):
machine.dispense_item()
# 如果薯片售罄,尝试购买巧克力
elif machine.select_item("B1"):
machine.dispense_item()
# 再购买3个商品,应该会售罄
for _ in range(3):
machine.insert_coin()
if machine.get_current_state() == "已投币状态":
if machine.select_item("A1"):
machine.dispense_item()
elif machine.select_item("A2"):
machine.dispense_item()
elif machine.select_item("B1"):
machine.dispense_item()
# 检查最终状态
print("\n=== 最终状态 ===")
print(f"当前状态: {machine.get_current_state()}")
machine.insert_coin() # 应该失败,无货
if __name__ == "__main__":
test_vending_machine()
10. 模板方法模式 (Template Method Pattern)
题目描述:
设计一个数据分析系统,支持多种数据处理流程。所有流程都遵循相同的步骤:加载数据、清洗数据、分析数据和生成报告,但每个步骤的具体实现可能不同。
要求:
- 设计一个
DataAnalyzer
抽象基类,定义数据分析的模板方法和各个步骤的抽象方法 - 实现几种具体的数据分析器,如
SalesDataAnalyzer
、WebTrafficAnalyzer
和SocialMediaAnalyzer
- 模板方法应该定义整个分析流程,而具体子类只需实现各个步骤
- 提供钩子方法,允许子类选择性地覆盖某些步骤
- 编写测试代码,验证不同的数据分析器能够正确执行各自的分析流程
代码框架:
from abc import ABC, abstractmethod
class DataAnalyzer(ABC):
def analyze(self):
"""模板方法,定义数据分析的整体流程"""
self.load_data()
self.clean_data()
self.analyze_data()
if self.should_generate_report():
self.generate_report()
@abstractmethod
def load_data(self):
"""加载数据"""
pass
@abstractmethod
def clean_data(self):
"""清洗数据"""
pass
@abstractmethod
def analyze_data(self):
"""分析数据"""
pass
@abstractmethod
def generate_report(self):
"""生成报告"""
pass
def should_generate_report(self):
"""钩子方法,决定是否生成报告"""
return True
class SalesDataAnalyzer(DataAnalyzer):
# 实现销售数据分析器
pass
class WebTrafficAnalyzer(DataAnalyzer):
# 实现网站流量分析器
pass
class SocialMediaAnalyzer(DataAnalyzer):
# 实现社交媒体分析器
pass
# 测试代码
def test_data_analyzers():
# 测试不同的数据分析器
pass
参考答案:
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import os
class DataAnalyzer(ABC):
def __init__(self, data_source):
self.data_source = data_source
self.data = None
self.results = None
self.report_path = None
def analyze(self):
"""模板方法,定义数据分析的整体流程"""
print(f"\n开始分析 {self.__class__.__name__}")
print(f"数据源: {self.data_source}")
start_time = datetime.now()
self.load_data()
self.clean_data()
self.analyze_data()
if self.should_generate_report():
self.generate_report()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
print(f"分析完成,耗时: {duration:.2f} 秒")
if self.report_path:
print(f"报告保存在: {self.report_path}")
@abstractmethod
def load_data(self):
"""加载数据"""
pass
@abstractmethod
def clean_data(self):
"""清洗数据"""
pass
@abstractmethod
def analyze_data(self):
"""分析数据"""
pass
@abstractmethod
def generate_report(self):
"""生成报告"""
pass
def should_generate_report(self):
"""钩子方法,决定是否生成报告"""
return True
class SalesDataAnalyzer(DataAnalyzer):
def load_data(self):
print("加载销售数据...")
# 模拟从CSV文件加载销售数据
# 在实际应用中,这里会从真实数据源加载数据
self.data = pd.DataFrame({
'date': pd.date_range(start='2023-01-01', periods=100),
'product': np.random.choice(['A', 'B', 'C', 'D'], size=100),
'region': np.random.choice(['North', 'South', 'East', 'West'], size=100),
'sales': np.random.randint(100, 1000, size=100),
'units': np.random.randint(1, 50, size=100)
})
print(f"加载了 {len(self.data)} 条销售记录")
def clean_data(self):
print("清洗销售数据...")
# 删除缺失值
self.data = self.data.dropna()
# 删除异常值(销售额为0但单位数不为0的记录)
self.data = self.data[~((self.data['sales'] == 0) & (self.data['units'] > 0))]
# 添加每单位销售额列
self.data['price_per_unit'] = self.data['sales'] / self.data['units']
print(f"清洗后剩余 {len(self.data)} 条记录")
def analyze_data(self):
print("分析销售数据...")
# 按产品和地区分组计算销售统计
product_sales = self.data.groupby('product')['sales'].agg(['sum', 'mean', 'count'])
region_sales = self.data.groupby('region')['sales'].agg(['sum', 'mean', 'count'])
# 计算每月销售趋势
self.data['month'] = self.data['date'].dt.to_period('M')
monthly_sales = self.data.groupby('month')['sales'].sum()
self.results = {
'product_sales': product_sales,
'region_sales': region_sales,
'monthly_sales': monthly_sales
}
print("销售数据分析完成")
print("\n产品销售统计:")
print(product_sales)
print("\n地区销售统计:")
print(region_sales)
def generate_report(self):
print("生成销售报告...")
# 创建报告目录
os.makedirs('reports', exist_ok=True)
# 生成报告文件名
report_file = f"reports/sales_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self.report_path = report_file
# 写入报告
with open(report_file, 'w') as f:
f.write("销售数据分析报告\n")
f.write("=" * 50 + "\n\n")
f.write("产品销售统计:\n")
f.write(str(self.results['product_sales']) + "\n\n")
f.write("地区销售统计:\n")
f.write(str(self.results['region_sales']) + "\n\n")
f.write("月度销售趋势:\n")
f.write(str(self.results['monthly_sales']) + "\n")
# 生成图表
plt.figure(figsize=(12, 8))
# 产品销售额饼图
plt.subplot(2, 2, 1)
product_sales = self.results['product_sales']['sum']
plt.pie(product_sales, labels=product_sales.index, autopct='%1.1f%%')
plt.title('产品销售额占比')
# 地区销售额柱状图
plt.subplot(2, 2, 2)
region_sales = self.results['region_sales']['sum']
plt.bar(region_sales.index, region_sales.values)
plt.title('地区销售额')
# 月度销售趋势线图
plt.subplot(2, 1, 2)
monthly_sales = self.results['monthly_sales']
plt.plot(range(len(monthly_sales)), monthly_sales.values)
plt.xticks(range(len(monthly_sales)), [str(x) for x in monthly_sales.index], rotation=45)
plt.title('月度销售趋势')
plt.tight_layout()
chart_file = f"reports/sales_charts_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
plt.savefig(chart_file)
plt.close()
print(f"销售报告已保存: {report_file}")
print(f"销售图表已保存: {chart_file}")
class WebTrafficAnalyzer(DataAnalyzer):
def load_data(self):
print("加载网站流量数据...")
# 模拟从日志文件加载网站流量数据
self.data = pd.DataFrame({
'timestamp': pd.date_range(start='2023-01-01', periods=1000, freq='10min'),
'ip': [f'192.168.1.{np.random.randint(1, 255)}' for _ in range(1000)],
'url': np.random.choice(['/home', '/products', '/about', '/contact', '/login'], size=1000),
'status_code': np.random.choice([200, 301, 404, 500], size=1000, p=[0.8, 0.1, 0.07, 0.03]),
'user_agent': np.random.choice(['Chrome', 'Firefox', 'Safari', 'Edge'], size=1000),
'response_time': np.random.uniform(0.1, 2.0, size=1000)
})
print(f"加载了 {len(self.data)} 条访问记录")
def clean_data(self):
print("清洗网站流量数据...")
# 删除响应时间异常的记录
self.data = self.data[self.data['response_time'] < 1.5]
# 添加日期和小时列
self.data['date'] = self.data['timestamp'].dt.date
self.data['hour'] = self.data['timestamp'].dt.hour
print(f"清洗后剩余 {len(self.data)} 条记录")
def analyze_data(self):
print("分析网站流量数据...")
# 计算每日访问量
daily_visits = self.data.groupby('date').size()
# 计算每小时访问量
hourly_visits = self.data.groupby('hour').size()
# 计算页面访问统计
page_visits = self.data.groupby('url').size().sort_values(ascending=False)
# 计算状态码统计
status_stats = self.data.groupby('status_code').size()
# 计算平均响应时间
avg_response_time = self.data.groupby('url')['response_time'].mean()
self.results = {
'daily_visits': daily_visits,
'hourly_visits': hourly_visits,
'page_visits': page_visits,
'status_stats': status_stats,
'avg_response_time': avg_response_time
}
print("网站流量分析完成")
print("\n页面访问统计:")
print(page_visits)
print("\n状态码统计:")
print(status_stats)
def generate_report(self):
print("生成网站流量报告...")
# 创建报告目录
os.makedirs('reports', exist_ok=True)
# 生成报告文件名
report_file = f"reports/web_traffic_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self.report_path = report_file
# 写入报告
with open(report_file, 'w') as f:
f.write("网站流量分析报告\n")
f.write("=" * 50 + "\n\n")
f.write("每日访问量:\n")
f.write(str(self.results['daily_visits'].tail(5)) + "\n\n")
f.write("每小时访问量:\n")
f.write(str(self.results['hourly_visits']) + "\n\n")
f.write("页面访问统计:\n")
f.write(str(self.results['page_visits']) + "\n\n")
f.write("状态码统计:\n")
f.write(str(self.results['status_stats']) + "\n\n")
f.write("平均响应时间:\n")
f.write(str(self.results['avg_response_time']) + "\n")
print(f"网站流量报告已保存: {report_file}")
class SocialMediaAnalyzer(DataAnalyzer):
def load_data(self):
print("加载社交媒体数据...")
# 模拟从API加载社交媒体数据
self.data = pd.DataFrame({
'post_id': range(1, 201),
'timestamp': pd.date_range(start='2023-01-01', periods=200),
'content': [f'Post content {i}' for i in range(1, 201)],
'likes': np.random.randint(0, 1000, size=200),
'shares': np.random.randint(0, 200, size=200),
'comments': np.random.randint(0, 100, size=200),
'user_id': np.random.randint(1, 50, size=200)
})
print(f"加载了 {len(self.data)} 条社交媒体记录")
def clean_data(self):
print("清洗社交媒体数据...")
# 计算互动总数
self.data['engagement'] = self.data['likes'] + self.data['shares'] * 2 + self.data['comments'] * 3
# 添加日期列
self.data['date'] = self.data['timestamp'].dt.date
# 添加星期几列
self.data['day_of_week'] = self.data['timestamp'].dt.day_name()
print("社交媒体数据清洗完成")
def analyze_data(self):
print("分析社交媒体数据...")
# 计算每日互动统计
daily_engagement = self.data.groupby('date')['engagement'].sum()
# 计算星期几互动统计
day_of_week_engagement = self.data.groupby('day_of_week')['engagement'].mean().reindex([
'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'
])
# 计算互动最高的帖子
top_posts = self.data.sort_values('engagement', ascending=False).head(10)
# 计算用户互动统计
user_engagement = self.data.groupby('user_id')['engagement'].sum().sort_values(ascending=False)
self.results = {
'daily_engagement': daily_engagement,
'day_of_week_engagement': day_of_week_engagement,
'top_posts': top_posts,
'user_engagement': user_engagement
}
print("社交媒体数据分析完成")
print("\n星期几互动统计:")
print(day_of_week_engagement)
print("\n互动最高的帖子:")
print(top_posts[['post_id', 'likes', 'shares', 'comments', 'engagement']].head())
def should_generate_report(self):
# 只有当有高互动帖子时才生成报告
return any(self.data['engagement'] > 1000)
def generate_report(self):
print("生成社交媒体报告...")
# 创建报告目录
os.makedirs('reports', exist_ok=True)
# 生成报告文件名
report_file = f"reports/social_media_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
self.report_path = report_file
# 写入报告
with open(report_file, 'w') as f:
f.write("社交媒体分析报告\n")
f.write("=" * 50 + "\n\n")
f.write("每日互动统计:\n")
f.write(str(self.results['daily_engagement'].tail(5)) + "\n\n")
f.write("星期几互动统计:\n")
f.write(str(self.results['day_of_week_engagement']) + "\n\n")
f.write("互动最高的帖子:\n")
top_posts = self.results['top_posts'][['post_id', 'likes', 'shares', 'comments', 'engagement']]
f.write(str(top_posts.head()) + "\n\n")
f.write("用户互动统计 (前5名):\n")
f.write(str(self.results['user_engagement'].head(5)) + "\n")
print(f"社交媒体报告已保存到 {report_file}")
# 测试代码
def test_data_analyzers():
# 创建并测试销售数据分析器
sales_analyzer = SalesDataAnalyzer("sales_data.csv")
sales_analyzer.analyze()
# 创建并测试网站流量分析器
web_analyzer = WebTrafficAnalyzer("web_logs.json")
web_analyzer.analyze()
# 创建并测试社交媒体分析器
social_analyzer = SocialMediaAnalyzer("social_api")
social_analyzer.analyze()
# 测试钩子方法
print("\n=== 测试钩子方法 ===")
# 创建一个不生成报告的分析器
class NoReportAnalyzer(SalesDataAnalyzer):
def should_generate_report(self):
return False
no_report_analyzer = NoReportAnalyzer("sales_data.csv")
no_report_analyzer.analyze()
if __name__ == "__main__":
test_data_analyzers()
11. 综合应用题:电子商务系统
题目描述:
设计一个简化版的电子商务系统,综合应用多种设计模式。系统需要支持商品浏览、购物车管理、订单处理和支付等功能。
要求:
- 使用工厂模式创建不同类型的商品
- 使用单例模式管理购物车
- 使用观察者模式实现商品库存变化通知
- 使用策略模式处理不同的支付方式
- 使用命令模式实现订单操作(创建、取消、修改)
- 使用装饰器模式实现商品的附加服务(如礼品包装、快递保险等)
- 系统应该具有良好的可扩展性和可维护性
- 编写测试代码,验证系统的各项功能
代码框架:
# 这里只提供基本框架,需要综合应用多种设计模式实现完整功能
# 商品相关类
class Product:
# 商品基类
pass
class ProductFactory:
# 商品工厂
pass
# 购物车相关类
class ShoppingCart:
# 购物车(单例)
pass
# 库存相关类
class InventorySubject:
# 库存主题
pass
class InventoryObserver:
# 库存观察者接口
pass
# 支付相关类
class PaymentStrategy:
# 支付策略接口
pass
# 订单相关类
class OrderCommand:
# 订单命令接口
pass
# 商品服务装饰器
class ProductServiceDecorator:
# 商品服务装饰器
pass
# 电子商务系统
class ECommerceSystem:
# 整合各组件的系统类
pass
# 测试代码
def test_ecommerce_system():
# 测试电子商务系统的各项功能
pass
参考答案:
from abc import ABC, abstractmethod
import uuid
from datetime import datetime
# =============== 1. 工厂模式 - 商品创建 ===============
class Product:
def __init__(self, id, name, price, category, description):
self.id = id
self.name = name
self.price = price
self.category = category
self.description = description
def __str__(self):
return f"{self.name} (${self.price:.2f})"
class ElectronicProduct(Product):
def __init__(self, id, name, price, description, warranty_period, brand):
super().__init__(id, name, price, "Electronics", description)
self.warranty_period = warranty_period
self.brand = brand
class ClothingProduct(Product):
def __init__(self, id, name, price, description, size, color):
super().__init__(id, name, price, "Clothing", description)
self.size = size
self.color = color
class BookProduct(Product):
def __init__(self, id, name, price, description, author, publisher):
super().__init__(id, name, price, "Books", description)
self.author = author
self.publisher = publisher
class ProductFactory:
@staticmethod
def create_product(product_type, **kwargs):
if product_type == "electronics":
return ElectronicProduct(
id=kwargs.get('id', str(uuid.uuid4())),
name=kwargs.get('name', ''),
price=kwargs.get('price', 0.0),
description=kwargs.get('description', ''),
warranty_period=kwargs.get('warranty_period', '1 year'),
brand=kwargs.get('brand', '')
)
elif product_type == "clothing":
return ClothingProduct(
id=kwargs.get('id', str(uuid.uuid4())),
name=kwargs.get('name', ''),
price=kwargs.get('price', 0.0),
description=kwargs.get('description', ''),
size=kwargs.get('size', 'M'),
color=kwargs.get('color', 'Black')
)
elif product_type == "book":
return BookProduct(
id=kwargs.get('id', str(uuid.uuid4())),
name=kwargs.get('name', ''),
price=kwargs.get('price', 0.0),
description=kwargs.get('description', ''),
author=kwargs.get('author', ''),
publisher=kwargs.get('publisher', '')
)
else:
raise ValueError(f"Unknown product type: {product_type}")
# =============== 2. 单例模式 - 购物车 ===============
class ShoppingCart:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(ShoppingCart, cls).__new__(cls)
cls._instance.items = {} # 商品ID -> (商品, 数量)
return cls._instance
def add_item(self, product, quantity=1):
if product.id in self.items:
self.items[product.id] = (product, self.items[product.id][1] + quantity)
else:
self.items[product.id] = (product, quantity)
def remove_item(self, product_id):
if product_id in self.items:
del self.items[product_id]
def update_quantity(self, product_id, quantity):
if product_id in self.items:
if quantity <= 0:
self.remove_item(product_id)
else:
product = self.items[product_id][0]
self.items[product_id] = (product, quantity)
def get_total(self):
return sum(product.price * quantity for product, quantity in self.items.values())
def clear(self):
self.items.clear()
def get_items(self):
return [(product, quantity) for product, quantity in self.items.values()]
def __str__(self):
if not self.items:
return "购物车为空"
cart_str = "购物车内容:\n"
for product, quantity in self.items.values():
cart_str += f" - {product.name} x {quantity}: ${product.price * quantity:.2f}\n"
cart_str += f"总计: ${self.get_total():.2f}"
return cart_str
# =============== 3. 观察者模式 - 库存管理 ===============
class InventoryObserver(ABC):
@abstractmethod
def update(self, product_id, new_quantity):
pass
class InventorySubject:
def __init__(self):
self.observers = []
self.inventory = {} # 商品ID -> 数量
def register_observer(self, observer):
if observer not in self.observers:
self.observers.append(observer)
def remove_observer(self, observer):
if observer in self.observers:
self.observers.remove(observer)
def notify_observers(self, product_id, new_quantity):
for observer in self.observers:
observer.update(product_id, new_quantity)
def add_product(self, product, quantity):
self.inventory[product.id] = quantity
def update_quantity(self, product_id, quantity):
if product_id in self.inventory:
old_quantity = self.inventory[product_id]
self.inventory[product_id] = quantity
if old_quantity != quantity:
self.notify_observers(product_id, quantity)
def get_quantity(self, product_id):
return self.inventory.get(product_id, 0)
def is_available(self, product_id, quantity=1):
return self.get_quantity(product_id) >= quantity
class LowStockNotifier(InventoryObserver):
def __init__(self, threshold=5):
self.threshold = threshold
def update(self, product_id, new_quantity):
if new_quantity <= self.threshold:
print(f"低库存警告: 商品 {product_id} 库存仅剩 {new_quantity} 件")
class OutOfStockNotifier(InventoryObserver):
def update(self, product_id, new_quantity):
if new_quantity == 0:
print(f"缺货警告: 商品 {product_id} 已售罄")
# =============== 4. 策略模式 - 支付处理 ===============
class PaymentStrategy(ABC):
@abstractmethod
def pay(self, amount):
pass
class CreditCardPayment(PaymentStrategy):
def __init__(self, card_number, expiry_date, cvv):
self.card_number = card_number
self.expiry_date = expiry_date
self.cvv = cvv
def pay(self, amount):
# 实际应用中,这里会调用信用卡支付API
print(f"使用信用卡支付 ${amount:.2f}")
print(f"卡号: {self.card_number[-4:]} (到期日: {self.expiry_date})")
return {"success": True, "transaction_id": str(uuid.uuid4())}
class PayPalPayment(PaymentStrategy):
def __init__(self, email):
self.email = email
def pay(self, amount):
# 实际应用中,这里会调用PayPal API
print(f"使用PayPal支付 ${amount:.2f}")
print(f"PayPal账户: {self.email}")
return {"success": True, "transaction_id": str(uuid.uuid4())}
class BankTransferPayment(PaymentStrategy):
def __init__(self, account_number, bank_code):
self.account_number = account_number
self.bank_code = bank_code
def pay(self, amount):
# 实际应用中,这里会调用银行转账API
print(f"使用银行转账支付 ${amount:.2f}")
print(f"账号: {self.account_number}, 银行代码: {self.bank_code}")
return {"success": True, "transaction_id": str(uuid.uuid4())}
# =============== 5. 命令模式 - 订单处理 ===============
class Order:
def __init__(self, order_id, items, customer_info, payment_info, status="pending"):
self.order_id = order_id
self.items = items # [(product, quantity), ...]
self.customer_info = customer_info
self.payment_info = payment_info
self.status = status
self.created_at = datetime.now()
self.updated_at = self.created_at
def update_status(self, status):
self.status = status
self.updated_at = datetime.now()
def get_total(self):
return sum(product.price * quantity for product, quantity in self.items)
def __str__(self):
order_str = f"订单 #{self.order_id} ({self.status})\n"
order_str += f"创建时间: {self.created_at}\n"
order_str += "商品:\n"
for product, quantity in self.items:
order_str += f" - {product.name} x {quantity}: ${product.price * quantity:.2f}\n"
order_str += f"总计: ${self.get_total():.2f}"
return order_str
class OrderCommand(ABC):
@abstractmethod
def execute(self):
pass
@abstractmethod
def undo(self):
pass
class CreateOrderCommand(OrderCommand):
def __init__(self, order_manager, items, customer_info, payment_strategy):
self.order_manager = order_manager
self.items = items
self.customer_info = customer_info
self.payment_strategy = payment_strategy
self.order = None
def execute(self):
# 创建订单
order_id = str(uuid.uuid4())
self.order = Order(order_id, self.items, self.customer_info, self.payment_strategy)
# 处理支付
amount = self.order.get_total()
payment_result = self.payment_strategy.pay(amount)
if payment_result["success"]:
self.order.update_status("paid")
self.order_manager.add_order(self.order)
# 更新库存
for product, quantity in self.items:
current_quantity = self.order_manager.inventory.get_quantity(product.id)
self.order_manager.inventory.update_quantity(product.id, current_quantity - quantity)
return self.order
else:
self.order.update_status("payment_failed")
return None
def undo(self):
if self.order and self.order.status == "paid":
# 恢复库存
for product, quantity in self.items:
current_quantity = self.order_manager.inventory.get_quantity(product.id)
self.order_manager.inventory.update_quantity(product.id, current_quantity + quantity)
# 移除订单
self.order_manager.remove_order(self.order.order_id)
return True
return False
class CancelOrderCommand(OrderCommand):
def __init__(self, order_manager, order_id):
self.order_manager = order_manager
self.order_id = order_id
self.order = None
def execute(self):
self.order = self.order_manager.get_order(self.order_id)
if self.order and self.order.status in ["pending", "paid"]:
# 恢复库存
for product, quantity in self.order.items:
current_quantity = self.order_manager.inventory.get_quantity(product.id)
self.order_manager.inventory.update_quantity(product.id, current_quantity + quantity)
self.order.update_status("cancelled")
return True
return False
def undo(self):
if self.order and self.order.status == "cancelled":
# 重新扣减库存
for product, quantity in self.order.items:
current_quantity = self.order_manager.inventory.get_quantity(product.id)
self.order_manager.inventory.update_quantity(product.id, current_quantity - quantity)
self.order.update_status("paid")
return True
return False
class OrderManager:
def __init__(self, inventory):
self.orders = {} # 订单ID -> 订单
self.inventory = inventory
def add_order(self, order):
self.orders[order.order_id] = order
def remove_order(self, order_id):
if order_id in self.orders:
del self.orders[order_id]
def get_order(self, order_id):
return self.orders.get(order_id)
def get_all_orders(self):
return list(self.orders.values())
# =============== 6. 装饰器模式 - 商品服务 ===============
class ProductService(ABC):
@abstractmethod
def get_description(self):
pass
@abstractmethod
def get_cost(self):
pass
class ConcreteProduct(ProductService):
def __init__(self, product, quantity=1):
self.product = product
self.quantity = quantity
def get_description(self):
return f"{self.product.name} x {self.quantity}"
def get_cost(self):
return self.product.price * self.quantity
class ProductServiceDecorator(ProductService):
def __init__(self, product_service):
self.product_service = product_service
def get_description(self):
return self.product_service.get_description()
def get_cost(self):
return self.product_service.get_cost()
class GiftWrappingDecorator(ProductServiceDecorator):
def __init__(self, product_service, wrapping_style="标准"):
super().__init__(product_service)
self.wrapping_style = wrapping_style
def get_description(self):
return f"{self.product_service.get_description()} [礼品包装: {self.wrapping_style}]"
def get_cost(self):
# 礼品包装额外收费
wrapping_cost = 5.0 if self.wrapping_style == "豪华" else 2.0
return self.product_service.get_cost() + wrapping_cost
class ExpressShippingDecorator(ProductServiceDecorator):
def get_description(self):
return f"{self.product_service.get_description()} [快递配送]"
def get_cost(self):
# 快递配送额外收费
return self.product_service.get_cost() + 10.0
class InsuranceDecorator(ProductServiceDecorator):
def get_description(self):
return f"{self.product_service.get_description()} [运输保险]"
def get_cost(self):
# 保险费用为商品价值的5%
return self.product_service.get_cost() + (self.product_service.get_cost() * 0.05)
# =============== 电子商务系统 ===============
class ECommerceSystem:
def __init__(self):
self.product_factory = ProductFactory()
self.shopping_cart = ShoppingCart()
self.inventory = InventorySubject()
self.order_manager = OrderManager(self.inventory)
# 注册库存观察者
self.inventory.register_observer(LowStockNotifier(threshold=5))
self.inventory.register_observer(OutOfStockNotifier())
def create_product(self, product_type, **kwargs):
return self.product_factory.create_product(product_type, **kwargs)
def add_product_to_inventory(self, product, quantity):
self.inventory.add_product(product, quantity)
def add_to_cart(self, product, quantity=1):
if self.inventory.is_available(product.id, quantity):
self.shopping_cart.add_item(product, quantity)
return True
return False
def remove_from_cart(self, product_id):
self.shopping_cart.remove_item(product_id)
def update_cart_quantity(self, product_id, quantity):
self.shopping_cart.update_quantity(product_id, quantity)
def checkout(self, customer_info, payment_strategy):
items = self.shopping_cart.get_items()
if not items:
print("购物车为空,无法结账")
return None
# 检查库存
for product, quantity in items:
if not self.inventory.is_available(product.id, quantity):
print(f"商品 {product.name} 库存不足,无法结账")
return None
# 创建订单
command = CreateOrderCommand(self.order_manager, items, customer_info, payment_strategy)
order = command.execute()
if order:
print(f"订单创建成功: {order.order_id}")
self.shopping_cart.clear()
else:
print("订单创建失败")
return order
def cancel_order(self, order_id):
command = CancelOrderCommand(self.order_manager, order_id)
if command.execute():
print(f"订单 {order_id} 已取消")
return True
else:
print(f"无法取消订单 {order_id}")
return False
def get_order(self, order_id):
return self.order_manager.get_order(order_id)
def get_all_orders(self):
return self.order_manager.get_all_orders()
def apply_product_service(self, product, quantity=1, gift_wrapping=False, express_shipping=False, insurance=False, wrapping_style="标准"):
# 创建基础商品服务
product_service = ConcreteProduct(product, quantity)
# 应用装饰器
if gift_wrapping:
product_service = GiftWrappingDecorator(product_service, wrapping_style)
if express_shipping:
product_service = ExpressShippingDecorator(product_service)
if insurance:
product_service = InsuranceDecorator(product_service)
return product_service
# =============== 测试代码 ===============
def test_ecommerce_system():
# 创建电子商务系统
system = ECommerceSystem()
# 创建商品
laptop = system.create_product("electronics",
name="MacBook Pro",
price=1299.99,
description="13-inch, 8GB RAM, 256GB SSD",
warranty_period="2 years",
brand="Apple")
tshirt = system.create_product("clothing",
name="Cotton T-Shirt",
price=19.99,
description="Comfortable cotton t-shirt",
size="L",
color="Blue")
book = system.create_product("book",
name="Python Design Patterns",
price=39.99,
description="Learn design patterns in Python",
author="John Smith",
publisher="Tech Books")
# 添加商品到库存
system.add_product_to_inventory(laptop, 10)
system.add_product_to_inventory(tshirt, 50)
system.add_product_to_inventory(book, 20)
print("\n=== 测试工厂模式 ===")
print(f"电子产品: {laptop}")
print(f"服装: {tshirt}")
print(f"图书: {book}")
# 测试购物车(单例模式)
print("\n=== 测试单例模式 ===")
cart1 = ShoppingCart()
cart2 = ShoppingCart()
print(f"cart1 和 cart2 是同一个实例: {cart1 is cart2}")
# 添加商品到购物车
system.add_to_cart(laptop, 1)
system.add_to_cart(tshirt, 2)
print(system.shopping_cart)
# 测试观察者模式
print("\n=== 测试观察者模式 ===")
# 减少库存,触发低库存通知
current_quantity = system.inventory.get_quantity(laptop.id)
system.inventory.update_quantity(laptop.id, 3)
# 减少库存至0,触发缺货通知
system.inventory.update_quantity(book.id, 0)
# 测试策略模式
print("\n=== 测试策略模式 ===")
credit_card = CreditCardPayment("4111-1111-1111-1111", "12/25", "123")
paypal = PayPalPayment("customer@example.com")
bank_transfer = BankTransferPayment("12345678", "ABCDEF")
# 客户信息
customer_info = {
"name": "张三",
"email": "zhangsan@example.com",
"address": "北京市朝阳区123号",
"phone": "13800138000"
}
# 使用信用卡结账
order1 = system.checkout(customer_info, credit_card)
# 再次添加商品到购物车
system.add_to_cart(book, 1) # 这应该失败,因为书已经没有库存了
system.add_to_cart(tshirt, 1)
# 使用PayPal结账
order2 = system.checkout(customer_info, paypal)
# 测试命令模式
print("\n=== 测试命令模式 ===")
# 取消订单
if order1:
system.cancel_order(order1.order_id)
# 查看所有订单
print("\n所有订单:")
for order in system.get_all_orders():
print(order)
# 测试装饰器模式
print("\n=== 测试装饰器模式 ===")
# 创建带有附加服务的商品
laptop_with_services = system.apply_product_service(
laptop,
quantity=1,
gift_wrapping=True,
express_shipping=True,
insurance=True,
wrapping_style="豪华"
)
print(f"商品描述: {laptop_with_services.get_description()}")
print(f"总价: ${laptop_with_services.get_cost():.2f}")
# 添加库存以便测试
system.inventory.update_quantity(laptop.id, 5)
# 添加带服务的商品到购物车
system.add_to_cart(laptop, 1)
# 使用银行转账结账
order3 = system.checkout(customer_info, bank_transfer)
print("\n最终订单列表:")
for order in system.get_all_orders():
print(order)
if __name__ == "__main__":
test_ecommerce_system()
- 0
- 0
-
分享