在一个逻辑上实现了CQRS的单体(Monolith)或单一仓库(Monorepo)项目中,命令模型(Command Model)的内部实现细节常常会无意间渗透到查询模型(Query Model)中。团队可能会为了便利,在查询服务中直接引用命令侧的领域实体或内部DTO,这看似减少了代码重复,实则埋下了耦合的种子。当命令侧的领域模型因业务变更而重构时,原本独立的查询服务也被迫一同修改、测试和部署,CQRS模式带来的核心优势——读写分离、独立演化——被大大削弱。
核心挑战在于:如何从架构层面强制执行命令与查询之间的边界,确保它们不仅在逻辑上,更在物理上彻底解耦?
方案A:Monorepo下的逻辑隔离
这是最常见的起步方式。在一个Git仓库中,通过模块化(例如Gradle的子项目)来划分命令和查询的界限。
// monorepo-project/
// ├── build.gradle.kts
// ├── settings.gradle.kts (includes 'order-command', 'order-query', 'order-contracts')
// │
// ├── order-contracts/
// │ └── src/main/kotlin/.../events/OrderEvents.kt
// │
// ├── order-command/
// │ ├── build.gradle.kts (implementation(project(":order-contracts")))
// │ └── src/main/kotlin/.../domain/OrderAggregate.kt
// │
// └── order-query/
// ├── build.gradle.kts (implementation(project(":order-contracts")))
// └── src/main/kotlin/.../projection/OrderProjection.kt
优势分析:
- 统一构建与版本控制: 单一的构建命令可以编译和测试整个系统。跨模块的重构由IDE和编译器保证原子性。
- 本地开发便利: 开发者克隆一个仓库即可运行完整的系统,无需配置多个项目。
- 代码共享直接: 共享的
contracts
模块可以直接通过项目依赖引入,无需发布到制品库。
劣势与风险:
最大的风险在于纪律涣散。没有任何物理屏障能阻止一个缺乏经验的开发者在order-query
模块中添加对order-command
的直接依赖。
// 在 order-query/build.gradle.kts 中一个错误的依赖添加
// 这在 Monorepo 结构中很容易发生,并且在代码审查中可能被忽略
dependencies {
implementation(project(":order-contracts"))
// 错误!不应该直接依赖命令模块的内部实现
implementation(project(":order-command"))
}
一旦发生这种情况,查询逻辑就可以直接访问命令侧的聚合根、仓储甚至命令处理器,CQRS的边界便被彻底打破。此外,整个系统共享同一个部署流水线,查询模型的微小变更也需要触发整个应用的构建和部署流程,无法实现真正的独立扩展和发布。
方案B:Polyrepo下的物理隔离
该方案为命令和查询两侧分别建立独立的Git仓库,它们之间唯一的联系是一个独立版本化的“契约”仓库。
// 仓库一:order-contracts.git
// └── src/main/kotlin/.../events/OrderEvents.kt
//
// 仓库二:order-command-service.git
// └── build.gradle.kts (implementation("com.mycompany:order-contracts:1.0.1"))
//
// 仓库三:order-query-service.git
// └── build.gradle.kts (implementation("com.mycompany:order-contracts:1.0.1"))
这种结构从物理上杜绝了跨边界引用的可能性。order-query-service
的构建环境里根本不存在order-command-service
的源代码,耦合无从发生。
优势分析:
- 强制解耦: 物理隔离是保障边界最强有力的手段。团队必须通过预先定义的事件或API契约进行通信。
- 独立生命周期: 每个服务拥有自己的构建、测试、部署流水线和版本号。查询服务可以独立于命令服务进行部署和扩缩容。
- 技术异构性: 命令侧和查询侧可以选用完全不同的技术栈。例如,命令侧使用事件溯源和内存数据库,而查询侧使用Elasticsearch进行全文检索。
- 团队所有权清晰: 不同的团队可以完全拥有各自的服务仓库,权责分明。
劣势与挑战:
- 基础设施复杂性: 需要维护多个Git仓库、多个CI/CD流水线以及一个私有制品库(如Artifactory, Nexus)来托管契约包。
- 本地开发环境: 开发者需要同时运行多个服务、一个消息中间件和一个数据库,通常需要借助Docker Compose等工具来管理。
- 契约版本管理: 共享的
order-contracts
包的版本管理变得至关重要。升级契约版本需要在多个服务间进行协调,处理不当会导致兼容性问题。
最终选择与理由
对于一个追求长期可维护性、需要支持多个团队并行开发、且读写负载模式显著不同的复杂系统,我们选择方案B:Polyrepo物理隔离。尽管它带来了更高的初始基建成本和开发环境的复杂性,但其换来的是架构层面的清晰、团队的自主性以及系统演化的灵活性。这种前期投入是为了避免后期在泥潭般的耦合代码中挣扎,是一种战略性的架构决策。
核心实现概览
我们将通过一个简化的订单处理场景来展示Polyrepo下的CQRS实现。系统由三个核心仓库构成:order-contracts
、order-command-service
和order-query-service
。通信通过Kafka进行。
graph TD subgraph "order-command-service.git" A[Client API] --> B{Command Handler} B --> C[Order Aggregate] C -- Persists Event --> D[Event Store - Kafka] end subgraph "order-contracts.git (Published as JAR)" E[Shared Events & Commands DTOs] end subgraph "order-query-service.git" F[Event Consumer] -- Consumes Event --> D F --> G{Order Projector} G -- Updates Read Model --> H[Query Database - PostgreSQL] I[Client Query API] --> J[Query Handler] J -- Reads from --> H end C -- Uses --> E F -- Uses --> E
1. 契约仓库: order-contracts
这是整个架构的基石,定义了系统内所有跨服务通信的数据结构。它的产出是一个被版本化的JAR包。
order-contracts/build.gradle.kts
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin("jvm") version "1.8.21"
`maven-publish`
}
group = "com.mycompany.arch"
version = "1.0.0-SNAPSHOT"
repositories {
mavenCentral()
}
// 配置发布到私有Maven仓库
publishing {
publications {
create<MavenPublication>("maven") {
from(components["java"])
}
}
repositories {
maven {
name = "nexus"
url = uri("https://your-nexus-repo/repository/maven-snapshots/")
credentials {
username = System.getenv("NEXUS_USER")
password = System.getenv("NEXUS_PASSWORD")
}
}
}
}
order-contracts/src/main/kotlin/com/mycompany/arch/order/contracts.kt
package com.mycompany.arch.order
import java.math.BigDecimal
import java.time.Instant
import java.util.UUID
// --- Commands ---
// 命令是意图的表达,通常是祈使句
data class CreateOrderCommand(
val orderId: UUID,
val customerId: String,
val products: List<ProductInfo>
)
data class AddProductToOrderCommand(
val orderId: UUID,
val productId: String,
val quantity: Int,
val unitPrice: BigDecimal
)
data class ConfirmOrderCommand(
val orderId: UUID
)
// --- Events ---
// 事件是已经发生事实的记录,通常是过去式
// 使用 sealed interface 可以很好地组织领域事件
sealed interface OrderEvent {
val orderId: UUID
val occurredOn: Instant
}
data class OrderCreatedEvent(
override val orderId: UUID,
val customerId: String,
val products: List<ProductInfo>,
override val occurredOn: Instant = Instant.now()
) : OrderEvent
data class ProductAddedToOrderEvent(
override val orderId: UUID,
val productId: String,
val quantity: Int,
val unitPrice: BigDecimal,
override val occurredOn: Instant = Instant.now()
) : OrderEvent
data class OrderConfirmedEvent(
override val orderId: UUID,
val confirmationTime: Instant,
override val occurredOn: Instant = Instant.now()
) : OrderEvent
data class OrderCreationFailedEvent(
override val orderId: UUID,
val reason: String,
override val occurredOn: Instant = Instant.now()
) : OrderEvent
// --- Shared DTOs ---
data class ProductInfo(
val productId: String,
val quantity: Int,
val unitPrice: BigDecimal
)
这个仓库的代码极其简洁,只包含数据结构。完成后,通过./gradlew publish
将其发布到Nexus。
2. 命令服务: order-command-service
这个服务负责接收命令、执行业务逻辑、并产生事件。
order-command-service/build.gradle.kts
plugins {
id("org.springframework.boot") version "3.1.5"
id("io.spring.dependency-management") version "1.1.3"
kotlin("jvm") version "1.8.21"
kotlin("plugin.spring") version "1.8.21"
}
// ... spring boot config ...
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.kafka:spring-kafka")
// 关键:依赖于发布的契约包,而不是本地项目
implementation("com.mycompany.arch:order-contracts:1.0.0-SNAPSHOT")
// Jackson, logging, etc.
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
// 测试依赖
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
}
order-command-service/src/main/kotlin/.../domain/OrderAggregate.kt
package com.mycompany.arch.order.domain
import com.mycompany.arch.order.*
import java.math.BigDecimal
import java.util.UUID
import org.slf4j.LoggerFactory
// 这是一个简化的聚合根,用于演示业务逻辑和事件生成
// 在真实项目中,这会更复杂,并且会使用事件溯源
class OrderAggregate(private val state: OrderState) {
private val log = LoggerFactory.getLogger(javaClass)
// 业务方法处理命令,返回事件列表
fun handle(command: CreateOrderCommand): List<OrderEvent> {
if (state.status != OrderStatus.NON_EXISTENT) {
log.warn("Order {} already exists. Ignoring CreateOrderCommand.", command.orderId)
return listOf(OrderCreationFailedEvent(command.orderId, "Order already exists"))
}
if (command.products.isEmpty()) {
log.error("Cannot create order {} with no products.", command.orderId)
return listOf(OrderCreationFailedEvent(command.orderId, "Cannot create order with empty products"))
}
return listOf(OrderCreatedEvent(command.orderId, command.customerId, command.products))
}
fun handle(command: ConfirmOrderCommand): List<OrderEvent> {
if (state.status != OrderStatus.PENDING) {
log.error("Cannot confirm order {} in state {}", command.orderId, state.status)
throw IllegalStateException("Order must be in PENDING state to be confirmed")
}
return listOf(OrderConfirmedEvent(command.orderId, confirmationTime = java.time.Instant.now()))
}
// ... 其他命令处理方法 ...
companion object {
// 用于从事件历史中重建聚合状态
fun rehydrate(events: List<OrderEvent>): OrderAggregate {
val initialState = OrderState(UUID.randomUUID(), "", OrderStatus.NON_EXISTENT, emptyList(), BigDecimal.ZERO)
val finalState = events.fold(initialState) { state, event -> applyEvent(state, event) }
return OrderAggregate(finalState)
}
// 状态转换逻辑
private fun applyEvent(currentState: OrderState, event: OrderEvent): OrderState {
return when (event) {
is OrderCreatedEvent -> currentState.copy(
id = event.orderId,
customerId = event.customerId,
status = OrderStatus.PENDING,
products = event.products,
totalAmount = event.products.sumOf { it.unitPrice * BigDecimal(it.quantity) }
)
is OrderConfirmedEvent -> currentState.copy(status = OrderStatus.CONFIRMED)
// ... 其他事件应用逻辑 ...
else -> currentState
}
}
}
}
// 聚合的内部状态
data class OrderState(
val id: UUID,
val customerId: String,
val status: OrderStatus,
val products: List<ProductInfo>,
val totalAmount: BigDecimal
)
enum class OrderStatus { NON_EXISTENT, PENDING, CONFIRMED, CANCELLED }
order-command-service/src/main/kotlin/.../service/OrderCommandHandler.kt
package com.mycompany.arch.order.service
import com.mycompany.arch.order.*
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
@Service
class OrderCommandHandler(
// 假设有一个事件溯源的仓储
private val orderRepository: OrderEventSourcedRepository,
private val kafkaTemplate: KafkaTemplate<String, OrderEvent>
) {
private val log = LoggerFactory.getLogger(javaClass)
private val orderEventTopic = "order-events"
fun process(command: CreateOrderCommand) {
// 在真实项目中,应该先加载聚合,但这里是创建,所以直接构建
val emptyAggregate = orderRepository.load(command.orderId) // 会返回一个初始状态的聚合
val events = try {
emptyAggregate.handle(command)
} catch (e: Exception) {
log.error("Business logic exception for command {}: {}", command, e.message)
// 可以发布一个失败事件
return
}
if (events.isNotEmpty()) {
orderRepository.save(command.orderId, events)
events.forEach { event ->
// 使用事务性发件箱模式来保证事件一定能被发出
// 这里为了简化,直接发送
kafkaTemplate.send(orderEventTopic, event.orderId.toString(), event)
.whenComplete { result, ex ->
if (ex != null) {
log.error("Failed to send event {} to Kafka", event, ex)
// 需要有重试或死信队列机制
} else {
log.info("Event {} sent to topic {}", event.javaClass.simpleName, result?.recordMetadata?.topic())
}
}
}
}
}
// ... 其他命令的处理方法 ...
}
// 仓储的接口,具体实现可以是基于 Kafka Streams, Akka Persistence, Axon Framework 等
interface OrderEventSourcedRepository {
fun save(aggregateId: UUID, events: List<OrderEvent>)
fun load(aggregateId: UUID): OrderAggregate
}
3. 查询服务: order-query-service
该服务消费事件,构建并维护一个用于查询的物化视图。
order-query-service/build.gradle.kts
// ... 插件配置与命令服务类似 ...
dependencies {
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
implementation("org.springframework.kafka:spring-kafka")
runtimeOnly("org.postgresql:postgresql")
// 同样依赖于发布的契约包
implementation("com.mycompany.arch:order-contracts:1.0.0-SNAPSHOT")
// ... 其他依赖 ...
}
order-query-service/src/main/kotlin/.../projection/OrderSummaryProjection.kt
package com.mycompany.arch.order.projection
import com.mycompany.arch.order.*
import org.slf4j.LoggerFactory
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
import org.springframework.stereotype.Repository
import jakarta.persistence.*
import java.math.BigDecimal
import java.time.Instant
import java.util.UUID
@Component
class OrderSummaryProjection(private val repository: OrderSummaryRepository) {
private val log = LoggerFactory.getLogger(javaClass)
// 监听事件主题
@KafkaListener(topics = ["order-events"], groupId = "order-summary-projector")
fun handle(event: OrderEvent) {
log.info("Received event: {}", event.javaClass.simpleName)
try {
when (event) {
is OrderCreatedEvent -> createOrderSummary(event)
is OrderConfirmedEvent -> confirmOrderSummary(event)
// 对于不关心的事件,直接忽略
else -> log.debug("Ignoring event of type {}", event.javaClass.simpleName)
}
} catch (e: Exception) {
// 这里的错误处理至关重要
// 如果处理失败,应该进入死信队列,人工干预,否则会阻塞整个分区的消费
log.error("Failed to project event {}: {}", event, e.message, e)
throw e // 抛出异常,让 Kafka consumer 进行重试或进入死信
}
}
private fun createOrderSummary(event: OrderCreatedEvent) {
val totalAmount = event.products.sumOf { it.unitPrice * BigDecimal(it.quantity) }
val summary = OrderSummary(
id = event.orderId,
customerId = event.customerId,
status = "PENDING",
totalAmount = totalAmount,
productCount = event.products.sumOf { it.quantity },
createdAt = event.occurredOn,
updatedAt = event.occurredOn
)
repository.save(summary)
log.info("Created order summary for orderId {}", event.orderId)
}
private fun confirmOrderSummary(event: OrderConfirmedEvent) {
val summary = repository.findById(event.orderId)
.orElseThrow { IllegalStateException("Order summary not found for id ${event.orderId}") }
summary.status = "CONFIRMED"
summary.updatedAt = event.occurredOn
repository.save(summary)
log.info("Confirmed order summary for orderId {}", event.orderId)
}
}
// 查询模型 (Read Model)
@Entity
@Table(name = "order_summaries")
data class OrderSummary(
@Id
var id: UUID,
var customerId: String,
var status: String,
var totalAmount: BigDecimal,
var productCount: Int,
var createdAt: Instant,
var updatedAt: Instant
)
// 使用 Spring Data JPA 简化数据库操作
@Repository
interface OrderSummaryRepository : JpaRepository<OrderSummary, UUID>
架构的扩展性与局限性
这种物理隔离的架构模式提供了极佳的扩展性。例如,如果需要一个为数据分析团队服务的、包含订单明细的只读模型,我们只需创建一个新的order-detail-query-service
仓库,让它同样消费order-events
主题的事件,并将其投射到ClickHouse或类似的数据仓库中。这一切都无需对命令服务或现有的查询服务做任何改动。
然而,该架构的局限性也同样明显:
- 最终一致性: 从命令执行成功到查询模型更新之间存在延迟。UI设计必须能处理这种延迟,例如通过轮询、WebSocket推送或向用户显示“订单处理中”的状态。这不是一个技术问题,而是一个必须与产品和业务达成共识的权衡。
- 开发运维成本: Polyrepo显著增加了运维的复杂性。需要自动化的CI/CD流水线、统一的日志收集、分布式追踪(如OpenTelemetry)来诊断跨服务问题。本地开发环境的搭建也对新成员有一定的学习曲线。
- 事件契约演进:
order-contracts
是系统的“脊椎”。对事件结构进行不兼容的修改(如删除字段)是一项高风险操作,需要精心的版本控制策略(如Schema Registry、多版本事件处理器)和跨团队的协调发布,以确保新旧服务都能正确处理过渡期的事件。