在 Kotlin 中利用 Polyrepo 物理隔离实现 CQRS 的架构权衡与实践


在一个逻辑上实现了CQRS的单体(Monolith)或单一仓库(Monorepo)项目中,命令模型(Command Model)的内部实现细节常常会无意间渗透到查询模型(Query Model)中。团队可能会为了便利,在查询服务中直接引用命令侧的领域实体或内部DTO,这看似减少了代码重复,实则埋下了耦合的种子。当命令侧的领域模型因业务变更而重构时,原本独立的查询服务也被迫一同修改、测试和部署,CQRS模式带来的核心优势——读写分离、独立演化——被大大削弱。

核心挑战在于:如何从架构层面强制执行命令与查询之间的边界,确保它们不仅在逻辑上,更在物理上彻底解耦?

方案A:Monorepo下的逻辑隔离

这是最常见的起步方式。在一个Git仓库中,通过模块化(例如Gradle的子项目)来划分命令和查询的界限。

// monorepo-project/
// ├── build.gradle.kts
// ├── settings.gradle.kts (includes 'order-command', 'order-query', 'order-contracts')
// │
// ├── order-contracts/
// │   └── src/main/kotlin/.../events/OrderEvents.kt
// │
// ├── order-command/
// │   ├── build.gradle.kts (implementation(project(":order-contracts")))
// │   └── src/main/kotlin/.../domain/OrderAggregate.kt
// │
// └── order-query/
//     ├── build.gradle.kts (implementation(project(":order-contracts")))
//     └── src/main/kotlin/.../projection/OrderProjection.kt

优势分析:

  1. 统一构建与版本控制: 单一的构建命令可以编译和测试整个系统。跨模块的重构由IDE和编译器保证原子性。
  2. 本地开发便利: 开发者克隆一个仓库即可运行完整的系统,无需配置多个项目。
  3. 代码共享直接: 共享的contracts模块可以直接通过项目依赖引入,无需发布到制品库。

劣势与风险:
最大的风险在于纪律涣散。没有任何物理屏障能阻止一个缺乏经验的开发者在order-query模块中添加对order-command的直接依赖。

// 在 order-query/build.gradle.kts 中一个错误的依赖添加
// 这在 Monorepo 结构中很容易发生,并且在代码审查中可能被忽略
dependencies {
    implementation(project(":order-contracts"))
    // 错误!不应该直接依赖命令模块的内部实现
    implementation(project(":order-command")) 
}

一旦发生这种情况,查询逻辑就可以直接访问命令侧的聚合根、仓储甚至命令处理器,CQRS的边界便被彻底打破。此外,整个系统共享同一个部署流水线,查询模型的微小变更也需要触发整个应用的构建和部署流程,无法实现真正的独立扩展和发布。

方案B:Polyrepo下的物理隔离

该方案为命令和查询两侧分别建立独立的Git仓库,它们之间唯一的联系是一个独立版本化的“契约”仓库。

// 仓库一:order-contracts.git
// └── src/main/kotlin/.../events/OrderEvents.kt
//
// 仓库二:order-command-service.git
// └── build.gradle.kts (implementation("com.mycompany:order-contracts:1.0.1"))
//
// 仓库三:order-query-service.git
// └── build.gradle.kts (implementation("com.mycompany:order-contracts:1.0.1"))

这种结构从物理上杜绝了跨边界引用的可能性。order-query-service的构建环境里根本不存在order-command-service的源代码,耦合无从发生。

优势分析:

  1. 强制解耦: 物理隔离是保障边界最强有力的手段。团队必须通过预先定义的事件或API契约进行通信。
  2. 独立生命周期: 每个服务拥有自己的构建、测试、部署流水线和版本号。查询服务可以独立于命令服务进行部署和扩缩容。
  3. 技术异构性: 命令侧和查询侧可以选用完全不同的技术栈。例如,命令侧使用事件溯源和内存数据库,而查询侧使用Elasticsearch进行全文检索。
  4. 团队所有权清晰: 不同的团队可以完全拥有各自的服务仓库,权责分明。

劣势与挑战:

  1. 基础设施复杂性: 需要维护多个Git仓库、多个CI/CD流水线以及一个私有制品库(如Artifactory, Nexus)来托管契约包。
  2. 本地开发环境: 开发者需要同时运行多个服务、一个消息中间件和一个数据库,通常需要借助Docker Compose等工具来管理。
  3. 契约版本管理: 共享的order-contracts包的版本管理变得至关重要。升级契约版本需要在多个服务间进行协调,处理不当会导致兼容性问题。

最终选择与理由

对于一个追求长期可维护性、需要支持多个团队并行开发、且读写负载模式显著不同的复杂系统,我们选择方案B:Polyrepo物理隔离。尽管它带来了更高的初始基建成本和开发环境的复杂性,但其换来的是架构层面的清晰、团队的自主性以及系统演化的灵活性。这种前期投入是为了避免后期在泥潭般的耦合代码中挣扎,是一种战略性的架构决策。

核心实现概览

我们将通过一个简化的订单处理场景来展示Polyrepo下的CQRS实现。系统由三个核心仓库构成:order-contractsorder-command-serviceorder-query-service。通信通过Kafka进行。

graph TD
    subgraph "order-command-service.git"
        A[Client API] --> B{Command Handler}
        B --> C[Order Aggregate]
        C -- Persists Event --> D[Event Store - Kafka]
    end

    subgraph "order-contracts.git (Published as JAR)"
        E[Shared Events & Commands DTOs]
    end

    subgraph "order-query-service.git"
        F[Event Consumer] -- Consumes Event --> D
        F --> G{Order Projector}
        G -- Updates Read Model --> H[Query Database - PostgreSQL]
        I[Client Query API] --> J[Query Handler]
        J -- Reads from --> H
    end

    C -- Uses --> E
    F -- Uses --> E

1. 契约仓库: order-contracts

这是整个架构的基石,定义了系统内所有跨服务通信的数据结构。它的产出是一个被版本化的JAR包。

order-contracts/build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    kotlin("jvm") version "1.8.21"
    `maven-publish`
}

group = "com.mycompany.arch"
version = "1.0.0-SNAPSHOT"

repositories {
    mavenCentral()
}

// 配置发布到私有Maven仓库
publishing {
    publications {
        create<MavenPublication>("maven") {
            from(components["java"])
        }
    }
    repositories {
        maven {
            name = "nexus"
            url = uri("https://your-nexus-repo/repository/maven-snapshots/")
            credentials {
                username = System.getenv("NEXUS_USER")
                password = System.getenv("NEXUS_PASSWORD")
            }
        }
    }
}

order-contracts/src/main/kotlin/com/mycompany/arch/order/contracts.kt

package com.mycompany.arch.order

import java.math.BigDecimal
import java.time.Instant
import java.util.UUID

// --- Commands ---
// 命令是意图的表达,通常是祈使句
data class CreateOrderCommand(
    val orderId: UUID,
    val customerId: String,
    val products: List<ProductInfo>
)

data class AddProductToOrderCommand(
    val orderId: UUID,
    val productId: String,
    val quantity: Int,
    val unitPrice: BigDecimal
)

data class ConfirmOrderCommand(
    val orderId: UUID
)

// --- Events ---
// 事件是已经发生事实的记录,通常是过去式
// 使用 sealed interface 可以很好地组织领域事件
sealed interface OrderEvent {
    val orderId: UUID
    val occurredOn: Instant
}

data class OrderCreatedEvent(
    override val orderId: UUID,
    val customerId: String,
    val products: List<ProductInfo>,
    override val occurredOn: Instant = Instant.now()
) : OrderEvent

data class ProductAddedToOrderEvent(
    override val orderId: UUID,
    val productId: String,
    val quantity: Int,
    val unitPrice: BigDecimal,
    override val occurredOn: Instant = Instant.now()
) : OrderEvent

data class OrderConfirmedEvent(
    override val orderId: UUID,
    val confirmationTime: Instant,
    override val occurredOn: Instant = Instant.now()
) : OrderEvent

data class OrderCreationFailedEvent(
    override val orderId: UUID,
    val reason: String,
    override val occurredOn: Instant = Instant.now()
) : OrderEvent


// --- Shared DTOs ---
data class ProductInfo(
    val productId: String,
    val quantity: Int,
    val unitPrice: BigDecimal
)

这个仓库的代码极其简洁,只包含数据结构。完成后,通过./gradlew publish将其发布到Nexus。

2. 命令服务: order-command-service

这个服务负责接收命令、执行业务逻辑、并产生事件。

order-command-service/build.gradle.kts

plugins {
    id("org.springframework.boot") version "3.1.5"
    id("io.spring.dependency-management") version "1.1.3"
    kotlin("jvm") version "1.8.21"
    kotlin("plugin.spring") version "1.8.21"
}

// ... spring boot config ...

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.kafka:spring-kafka")
    
    // 关键:依赖于发布的契约包,而不是本地项目
    implementation("com.mycompany.arch:order-contracts:1.0.0-SNAPSHOT")
    
    // Jackson, logging, etc.
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    
    // 测试依赖
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.kafka:spring-kafka-test")
}

order-command-service/src/main/kotlin/.../domain/OrderAggregate.kt

package com.mycompany.arch.order.domain

import com.mycompany.arch.order.*
import java.math.BigDecimal
import java.util.UUID
import org.slf4j.LoggerFactory

// 这是一个简化的聚合根,用于演示业务逻辑和事件生成
// 在真实项目中,这会更复杂,并且会使用事件溯源
class OrderAggregate(private val state: OrderState) {
    
    private val log = LoggerFactory.getLogger(javaClass)

    // 业务方法处理命令,返回事件列表
    fun handle(command: CreateOrderCommand): List<OrderEvent> {
        if (state.status != OrderStatus.NON_EXISTENT) {
            log.warn("Order {} already exists. Ignoring CreateOrderCommand.", command.orderId)
            return listOf(OrderCreationFailedEvent(command.orderId, "Order already exists"))
        }
        if (command.products.isEmpty()) {
            log.error("Cannot create order {} with no products.", command.orderId)
            return listOf(OrderCreationFailedEvent(command.orderId, "Cannot create order with empty products"))
        }
        
        return listOf(OrderCreatedEvent(command.orderId, command.customerId, command.products))
    }

    fun handle(command: ConfirmOrderCommand): List<OrderEvent> {
        if (state.status != OrderStatus.PENDING) {
            log.error("Cannot confirm order {} in state {}", command.orderId, state.status)
            throw IllegalStateException("Order must be in PENDING state to be confirmed")
        }
        
        return listOf(OrderConfirmedEvent(command.orderId, confirmationTime = java.time.Instant.now()))
    }
    
    // ... 其他命令处理方法 ...

    companion object {
        // 用于从事件历史中重建聚合状态
        fun rehydrate(events: List<OrderEvent>): OrderAggregate {
            val initialState = OrderState(UUID.randomUUID(), "", OrderStatus.NON_EXISTENT, emptyList(), BigDecimal.ZERO)
            val finalState = events.fold(initialState) { state, event -> applyEvent(state, event) }
            return OrderAggregate(finalState)
        }
        
        // 状态转换逻辑
        private fun applyEvent(currentState: OrderState, event: OrderEvent): OrderState {
            return when (event) {
                is OrderCreatedEvent -> currentState.copy(
                    id = event.orderId,
                    customerId = event.customerId,
                    status = OrderStatus.PENDING,
                    products = event.products,
                    totalAmount = event.products.sumOf { it.unitPrice * BigDecimal(it.quantity) }
                )
                is OrderConfirmedEvent -> currentState.copy(status = OrderStatus.CONFIRMED)
                // ... 其他事件应用逻辑 ...
                else -> currentState
            }
        }
    }
}

// 聚合的内部状态
data class OrderState(
    val id: UUID,
    val customerId: String,
    val status: OrderStatus,
    val products: List<ProductInfo>,
    val totalAmount: BigDecimal
)

enum class OrderStatus { NON_EXISTENT, PENDING, CONFIRMED, CANCELLED }

order-command-service/src/main/kotlin/.../service/OrderCommandHandler.kt

package com.mycompany.arch.order.service

import com.mycompany.arch.order.*
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service

@Service
class OrderCommandHandler(
    // 假设有一个事件溯源的仓储
    private val orderRepository: OrderEventSourcedRepository, 
    private val kafkaTemplate: KafkaTemplate<String, OrderEvent>
) {
    private val log = LoggerFactory.getLogger(javaClass)
    private val orderEventTopic = "order-events"

    fun process(command: CreateOrderCommand) {
        // 在真实项目中,应该先加载聚合,但这里是创建,所以直接构建
        val emptyAggregate = orderRepository.load(command.orderId) // 会返回一个初始状态的聚合
        
        val events = try {
            emptyAggregate.handle(command)
        } catch (e: Exception) {
            log.error("Business logic exception for command {}: {}", command, e.message)
            // 可以发布一个失败事件
            return
        }

        if (events.isNotEmpty()) {
            orderRepository.save(command.orderId, events)
            events.forEach { event ->
                // 使用事务性发件箱模式来保证事件一定能被发出
                // 这里为了简化,直接发送
                kafkaTemplate.send(orderEventTopic, event.orderId.toString(), event)
                    .whenComplete { result, ex ->
                        if (ex != null) {
                            log.error("Failed to send event {} to Kafka", event, ex)
                            // 需要有重试或死信队列机制
                        } else {
                            log.info("Event {} sent to topic {}", event.javaClass.simpleName, result?.recordMetadata?.topic())
                        }
                    }
            }
        }
    }
    
    // ... 其他命令的处理方法 ...
}

// 仓储的接口,具体实现可以是基于 Kafka Streams, Akka Persistence, Axon Framework 等
interface OrderEventSourcedRepository {
    fun save(aggregateId: UUID, events: List<OrderEvent>)
    fun load(aggregateId: UUID): OrderAggregate
}

3. 查询服务: order-query-service

该服务消费事件,构建并维护一个用于查询的物化视图。

order-query-service/build.gradle.kts

// ... 插件配置与命令服务类似 ...
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    implementation("org.springframework.kafka:spring-kafka")
    runtimeOnly("org.postgresql:postgresql")

    // 同样依赖于发布的契约包
    implementation("com.mycompany.arch:order-contracts:1.0.0-SNAPSHOT")

    // ... 其他依赖 ...
}

order-query-service/src/main/kotlin/.../projection/OrderSummaryProjection.kt

package com.mycompany.arch.order.projection

import com.mycompany.arch.order.*
import org.slf4j.LoggerFactory
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
import org.springframework.stereotype.Repository
import jakarta.persistence.*
import java.math.BigDecimal
import java.time.Instant
import java.util.UUID

@Component
class OrderSummaryProjection(private val repository: OrderSummaryRepository) {

    private val log = LoggerFactory.getLogger(javaClass)
    
    // 监听事件主题
    @KafkaListener(topics = ["order-events"], groupId = "order-summary-projector")
    fun handle(event: OrderEvent) {
        log.info("Received event: {}", event.javaClass.simpleName)
        try {
            when (event) {
                is OrderCreatedEvent -> createOrderSummary(event)
                is OrderConfirmedEvent -> confirmOrderSummary(event)
                // 对于不关心的事件,直接忽略
                else -> log.debug("Ignoring event of type {}", event.javaClass.simpleName)
            }
        } catch (e: Exception) {
            // 这里的错误处理至关重要
            // 如果处理失败,应该进入死信队列,人工干预,否则会阻塞整个分区的消费
            log.error("Failed to project event {}: {}", event, e.message, e)
            throw e // 抛出异常,让 Kafka consumer 进行重试或进入死信
        }
    }
    
    private fun createOrderSummary(event: OrderCreatedEvent) {
        val totalAmount = event.products.sumOf { it.unitPrice * BigDecimal(it.quantity) }
        val summary = OrderSummary(
            id = event.orderId,
            customerId = event.customerId,
            status = "PENDING",
            totalAmount = totalAmount,
            productCount = event.products.sumOf { it.quantity },
            createdAt = event.occurredOn,
            updatedAt = event.occurredOn
        )
        repository.save(summary)
        log.info("Created order summary for orderId {}", event.orderId)
    }
    
    private fun confirmOrderSummary(event: OrderConfirmedEvent) {
        val summary = repository.findById(event.orderId)
            .orElseThrow { IllegalStateException("Order summary not found for id ${event.orderId}") }
        
        summary.status = "CONFIRMED"
        summary.updatedAt = event.occurredOn
        repository.save(summary)
        log.info("Confirmed order summary for orderId {}", event.orderId)
    }
}

// 查询模型 (Read Model)
@Entity
@Table(name = "order_summaries")
data class OrderSummary(
    @Id
    var id: UUID,
    var customerId: String,
    var status: String,
    var totalAmount: BigDecimal,
    var productCount: Int,
    var createdAt: Instant,
    var updatedAt: Instant
)

// 使用 Spring Data JPA 简化数据库操作
@Repository
interface OrderSummaryRepository : JpaRepository<OrderSummary, UUID>

架构的扩展性与局限性

这种物理隔离的架构模式提供了极佳的扩展性。例如,如果需要一个为数据分析团队服务的、包含订单明细的只读模型,我们只需创建一个新的order-detail-query-service仓库,让它同样消费order-events主题的事件,并将其投射到ClickHouse或类似的数据仓库中。这一切都无需对命令服务或现有的查询服务做任何改动。

然而,该架构的局限性也同样明显:

  1. 最终一致性: 从命令执行成功到查询模型更新之间存在延迟。UI设计必须能处理这种延迟,例如通过轮询、WebSocket推送或向用户显示“订单处理中”的状态。这不是一个技术问题,而是一个必须与产品和业务达成共识的权衡。
  2. 开发运维成本: Polyrepo显著增加了运维的复杂性。需要自动化的CI/CD流水线、统一的日志收集、分布式追踪(如OpenTelemetry)来诊断跨服务问题。本地开发环境的搭建也对新成员有一定的学习曲线。
  3. 事件契约演进: order-contracts是系统的“脊椎”。对事件结构进行不兼容的修改(如删除字段)是一项高风险操作,需要精心的版本控制策略(如Schema Registry、多版本事件处理器)和跨团队的协调发布,以确保新旧服务都能正确处理过渡期的事件。

  目录