利用 Flink 与 DynamoDB 构建实时图特征计算与服务架构


一个棘手的生产需求摆在了面前:我们需要为核心风控系统提供近乎实时的关联关系特征。例如,在用户发起一笔交易时,系统必须在50毫秒内判断该用户的设备、IP地址是否与已知的欺诈账户存在两度以内的关联。传统的做法是使用离线图计算(如Spark GraphX),每天生成图特征快照,但这无法捕捉到几分钟内新形成的欺诈团伙。数据延迟是致命的。

这就引出了一个核心的技术挑战:如何构建一个能够每秒处理数十万事件、动态更新图结构,并能为在线服务提供毫秒级图特征查询的系统。

定义复杂技术问题

经过梳理,我们的目标系统必须满足以下几个硬性指标:

  1. 极低查询延迟: 特征服务接口的 P99 延迟必须低于 20ms。这是在线同步调用的红线。
  2. 高吞吐事件流: 系统需要稳定消费上游的 Kafka topic,峰值流量可达 200,000 events/sec。事件类型包括用户登录、交易、设备信息变更等。
  3. 动态图更新: 图谱结构不是静态的。新的用户、设备会作为节点加入,用户与设备、IP之间的关系会作为边动态创建。
  4. 成本可控与运维简化: 团队规模有限,我们无法接受一个需要专人长期维护、调优的复杂数据库集群。方案必须倾向于利用托管服务和Serverless理念。

方案A:专用图数据库(如 Amazon Neptune)

第一个进入视野的自然是专用的图数据库。架构会是这样的:

graph TD
    A[Kafka Event Stream] --> B{Apache Flink Job};
    B -- Simple Enrichment & Formatting --> C[Amazon Neptune];
    D[Online Service] -- Gremlin/SPARQL Query --> C;

优势分析:

  • 强大的查询能力: Neptune 原生支持 Gremlin 和 SPARQL 查询语言,对于复杂的、任意深度的图遍历、路径查找、模式匹配等查询,表现非常出色。我们可以轻松实现“查找N度关系内的所有欺诈节点”这类需求。
  • 功能成熟: 作为托管服务,它解决了备份、恢复、补丁等运维难题。

劣势分析:

  • 写入性能瓶颈: 图数据库为了维护复杂的索引和事务一致性,其写入性能通常不是强项。在高吞吐的流式写入场景下,很容易达到写入瓶颈或产生热点问题。
  • 查询延迟不确定性: 虽然简单查询很快,但复杂 Gremlin 查询的性能可能会随着图的复杂度和数据规模增长而变得不可预测。对于我们要求 P99 低于 20ms 的硬性指标,这是一个巨大的风险。
  • 成本与运维: Neptune 是基于实例付费的,为了应对峰值流量,我们需要预置相当规模的写入器和读取器实例,大部分时间资源可能是闲置的,成本较高。尽管是托管服务,其参数调优和性能监控仍然需要专业知识。

在真实项目中,为了保证核心交易链路的稳定性,不可预测的延迟是大忌。方案A的查询灵活性很诱人,但其性能不确定性和成本让我们望而却步。

这个方案转变了思路:将图的计算与存储分离。让 Flink 负责所有复杂的图计算和状态维护,而将一个高度优化的键值数据库用作查询服务层。

graph TD
    subgraph Real-time Computation Layer
        A[Kafka Event Stream] --> B{Flink Stateful Job: Graph Processor};
    end

    subgraph Low-Latency Serving Layer
        C[DynamoDB Table: Adjacency List Model];
    end

    subgraph Application Layer
        D[Online Service];
    end

    B -- Pre-computed Features & Graph Edges --> C;
    D -- Simple GetItem/Query --> C;

优势分析:

  • 极致的查询性能: DynamoDB 是一个为可预测的、低延迟读写而设计的数据库。通过精心设计的数据模型(如下文将详述的邻接表模式),我们可以将复杂的图遍历转化为对 DynamoDB 的若干次 GetItemQuery 操作,P99 延迟稳定在个位数毫秒是完全可行的。
  • 水平扩展的写入能力: Flink 和 DynamoDB 都是为水平扩展而生。Flink 的并行度可以根据 Kafka 分区数动态调整,而 DynamoDB 的写入容量可以按需自动伸缩。这完美匹配我们高吞吐的写入需求。
  • 成本效益与Serverless: Flink 可以部署在 K8s 或 Flink on Kinesis Data Analytics 上,实现资源弹性。DynamoDB 按实际使用量付费,没有闲置成本。整个架构的运维负担极小。

劣势分析:

  • 查询能力受限: 这是最大的妥协。我们无法执行任意的、即席的图查询。所有需要查询的特征(如图深度、节点属性聚合)都必须预先在 Flink 作业中计算好,并物化到 DynamoDB。图的“逻辑”被硬编码在了 Flink 应用代码中。
  • 应用逻辑复杂: 图的构建、更新、过期等所有逻辑都需要在 Flink 的有状态算子中自行实现。这对开发人员的 Flink 编程能力和状态管理经验提出了更高的要求。

最终选择与理由

我们最终选择了方案B

决策的核心理由是:为核心在线业务的性能确定性负责。风控场景下,99.9% 的查询都是固定的几种模式,例如“查询一度、二度关联节点”。为了这几种高频、低延迟的查询,牺牲掉几乎用不上的任意图分析能力,是一个完全合理的工程权衡。方案B将复杂性从不可控的数据库性能转移到了可控的、可测试的 Flink 应用代码中。这让我们对系统的最终性能表现更有信心。

核心实现概览

1. DynamoDB 的邻接表(Adjacency List)数据模型

这是将图结构存储在非图数据库中的关键。我们设计一个单独的表,使用复合主键来同时存储节点和边。

  • 表名: GraphFeatureStore
  • 主键:
    • PK (Partition Key): 节点 ID,例如 USER#123DEVICE#ABC
    • SK (Sort Key): 描述与该节点相关的关系或属性。
  • GSI (Global Secondary Index):
    • Index Name: inverted-index
    • 主键: SK (Partition Key), PK (Sort Key) - 用于反向查询,例如“查找所有关联到此设备的账户”。

数据示例:

PK SK Type Timestamp FraudScore
USER#123 METADATA#USER UserNode 1667889900 0.1
USER#123 DEVICE#ABC HasDevice 1667889910 -
USER#123 IP#1.2.3.4 HasIp 1667889920 -
DEVICE#ABC METADATA#DEVICE DeviceNode 1667889910 0.8
DEVICE#ABC USER#123 UsedByUser 1667889910 -

通过这个模型:

  • 查询用户123的所有关联(一度关系): Query on GraphFeatureStore where PK = "USER#123"
  • 查询设备ABC被哪些用户使用: Query on inverted-index where PK = "DEVICE#ABC" (在索引中,PK是原表的SK)。

我们将使用 Java 来实现 Flink 作业。

2.1 输入事件与图元素定义

首先,定义清晰的数据结构。

// src/main/java/com/mycorp/flink/model/InputEvent.java
package com.mycorp.flink.model;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;

// 假设这是从 Kafka 中消费的 JSON 事件反序列化后的对象
@Data
@NoArgsConstructor
@AllArgsConstructor
public class InputEvent {
    private String eventId;
    private String eventType; // e.g., "USER_LOGIN"
    private long eventTimestamp;
    private String userId;
    private String deviceId;
    private String ipAddress;
}

// src/main/java/com/mycorp/flink/model/GraphUpdate.java
package com.mycorp.flink.model;

import lombok.Data;
import lombok.Builder;

// Flink 作业内部流动的数据,表示一个图的更新操作
@Data
@Builder
public class GraphUpdate {
    // DynamoDB Table's PK
    private String pk;
    // DynamoDB Table's SK
    private String sk;
    // e.g., UserNode, DeviceNode, HasDeviceEdge
    private String type;
    private long timestamp;
    
    // Flag to indicate if this is a deletion
    private boolean isDelete = false;

    // ... 其他需要更新的属性
}

2.2 核心图处理逻辑:KeyedProcessFunction

这是整个系统的“大脑”。它消费事件流,将其转换为图的更新,并向下游发送。

// src/main/java/com/mycorp/flink/operator/EventToGraphProcessor.java
package com.mycorp.flink.operator;

import com.mycorp.flink.model.InputEvent;
import com.mycorp.flink.model.GraphUpdate;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventToGraphProcessor extends ProcessFunction<InputEvent, GraphUpdate> {

    private static final Logger LOG = LoggerFactory.getLogger(EventToGraphProcessor.class);

    @Override
    public void processElement(InputEvent event, Context ctx, Collector<GraphUpdate> out) throws Exception {
        LOG.debug("Processing event: {}", event.getEventId());

        if (event.getUserId() == null || (event.getDeviceId() == null && event.getIpAddress() == null)) {
            LOG.warn("Skipping event with missing critical identifiers: {}", event.getEventId());
            return;
        }

        String userPk = "USER#" + event.getUserId();
        
        // 1. 发出用户节点更新 (即使节点已存在,这也可以用于更新时间戳等属性)
        out.collect(GraphUpdate.builder()
                .pk(userPk)
                .sk("METADATA#USER")
                .type("UserNode")
                .timestamp(event.getEventTimestamp())
                .build());

        // 2. 处理与设备的关联
        if (event.getDeviceId() != null) {
            String devicePk = "DEVICE#" + event.getDeviceId();
            
            // 发出设备节点更新
            out.collect(GraphUpdate.builder()
                    .pk(devicePk)
                    .sk("METADATA#DEVICE")
                    .type("DeviceNode")
                    .timestamp(event.getEventTimestamp())
                    .build());
            
            // 创建正向边: User -> Device
            out.collect(GraphUpdate.builder()
                    .pk(userPk)
                    .sk(devicePk)
                    .type("HasDevice")
                    .timestamp(event.getEventTimestamp())
                    .build());
            
            // 创建反向边: Device -> User (用于GSI查询)
            out.collect(GraphUpdate.builder()
                    .pk(devicePk)
                    .sk(userPk)
                    .type("UsedByUser")
                    .timestamp(event.getEventTimestamp())
                    .build());
        }

        // 3. 处理与IP的关联 (逻辑类似)
        if (event.getIpAddress() != null) {
            String ipPk = "IP#" + event.getIpAddress();
            out.collect(GraphUpdate.builder().pk(ipPk).sk("METADATA#IP").type("IpNode").timestamp(event.getEventTimestamp()).build());
            out.collect(GraphUpdate.builder().pk(userPk).sk(ipPk).type("HasIp").timestamp(event.getEventTimestamp()).build());
            out.collect(GraphUpdate.builder().pk(ipPk).sk(userPk).type("UsedByUser").timestamp(event.getEventTimestamp()).build());
        }
    }
}

这里的 ProcessFunction 是无状态的,因为它只是简单地将每个事件转换为一组图更新操作。在更复杂的场景中,例如计算节点的度(degree)或执行窗口聚合,我们会使用 KeyedProcessFunction 并利用 Flink 的状态后端(ValueState, MapState)来存储中间结果。

2.3 自定义 DynamoDB Sink

虽然 Flink 社区提供了一些通用的异步IO工具和连接器,但对于 DynamoDB 这种需要精细化控制(如批量写入、错误重试、吞吐量控制)的场景,自定义一个 RichSinkFunction 往往能获得更好的性能和稳定性。

// src/main/java/com/mycorp/flink/sink/DynamoDbBatchSink.java
package com.mycorp.flink.sink;

import com.mycorp.flink.model.GraphUpdate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DynamoDbBatchSink extends RichSinkFunction<GraphUpdate> {

    private static final Logger LOG = LoggerFactory.getLogger(DynamoDbBatchSink.class);

    private final String tableName;
    private final int batchSize;
    private final long batchIntervalMs;

    private transient DynamoDbClient dynamoDbClient;
    private transient List<WriteRequest> buffer;
    private transient long lastFlushTime;

    public DynamoDbBatchSink(String tableName, int batchSize, long batchIntervalMs) {
        this.tableName = tableName;
        this.batchSize = batchSize;
        this.batchIntervalMs = batchIntervalMs;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.buffer = new ArrayList<>();
        this.lastFlushTime = System.currentTimeMillis();
        
        // 生产级的 SDK 配置:包含重试策略和超时
        RetryPolicy retryPolicy = RetryPolicy.builder()
                .numRetries(5)
                .backoffStrategy(EqualJitterBackoffStrategy.builder()
                        .baseDelay(Duration.ofMillis(100))
                        .maxBackoffTime(Duration.ofSeconds(2))
                        .build())
                .build();

        ClientOverrideConfiguration clientConfig = ClientOverrideConfiguration.builder()
                .apiCallAttemptTimeout(Duration.ofSeconds(2))
                .apiCallTimeout(Duration.ofSeconds(5))
                .retryPolicy(retryPolicy)
                .build();
                
        this.dynamoDbClient = DynamoDbClient.builder()
                .region(Region.AP_NORTHEAST_1) // 从配置中读取
                .credentialsProvider(DefaultCredentialsProvider.create())
                .overrideConfiguration(clientConfig)
                .build();
    }

    @Override
    public void invoke(GraphUpdate value, Context context) throws Exception {
        // 将 GraphUpdate 对象转换为 DynamoDB 的 WriteRequest
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("PK", AttributeValue.builder().s(value.getPk()).build());
        item.put("SK", AttributeValue.builder().s(value.getSk()).build());
        item.put("Type", AttributeValue.builder().s(value.getType()).build());
        item.put("Timestamp", AttributeValue.builder().n(String.valueOf(value.getTimestamp())).build());

        PutRequest putRequest = PutRequest.builder().item(item).build();
        WriteRequest writeRequest = WriteRequest.builder().putRequest(putRequest).build();
        
        buffer.add(writeRequest);

        if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime >= batchIntervalMs) {
            flush();
        }
    }
    
    @Override
    public void close() throws Exception {
        flush(); // 确保关闭前所有缓冲数据都被写入
        if (dynamoDbClient != null) {
            dynamoDbClient.close();
        }
        super.close();
    }

    private void flush() {
        if (buffer.isEmpty()) {
            return;
        }

        LOG.info("Flushing {} items to DynamoDB.", buffer.size());
        try {
            Map<String, List<WriteRequest>> requestItems = new HashMap<>();
            requestItems.put(tableName, new ArrayList<>(buffer)); // 拷贝一份,避免并发修改
            
            BatchWriteItemRequest request = BatchWriteItemRequest.builder()
                    .requestItems(requestItems)
                    .build();

            // DynamoDB BatchWriteItem 有可能部分成功,生产代码需要处理未成功的item并重试
            BatchWriteItemResponse response = dynamoDbClient.batchWriteItem(request);
            
            if (response.hasUnprocessedItems() && !response.unprocessedItems().isEmpty()) {
                 LOG.error("Failed to write all items to DynamoDB. Unprocessed items count: {}", 
                    response.unprocessedItems().get(tableName).size());
                 // 在真实项目中,这里必须有健壮的重试或告警逻辑,甚至可以将失败的请求发到死信队列
            }

        } catch (DynamoDbException e) {
            LOG.error("Error writing batch to DynamoDB. Retries might be attempted by SDK.", e);
            // 抛出异常可能导致 Flink 作业重启,这是一种可行的容错策略
            throw new RuntimeException("Fatal error during DynamoDB flush", e);
        } finally {
            buffer.clear();
            lastFlushTime = System.currentTimeMillis();
        }
    }
}

这个 Sink 的关键在于 flush 方法。它利用了 DynamoDB 的 BatchWriteItem API,一次最多可以写入25个 item,这比单次 PutItem 效率高得多,也更节省成本。同时,我们实现了基于大小和时间的双重触发机制,确保数据既能及时写入,也能在高负载时利用批处理的优势。生产级的代码必须处理 unprocessedItems,这是 DynamoDB 在限流等情况下返回的部分失败项。

2.4 组装 Flink 作业

最后,将所有组件串联起来。

// src/main/java/com/mycorp/flink/GraphFeatureJob.java
package com.mycorp.flink;

import com.mycorp.flink.model.InputEvent;
import com.mycorp.flink.operator.EventToGraphProcessor;
import com.mycorp.flink.sink.DynamoDbBatchSink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import com.google.gson.Gson;

public class GraphFeatureJob {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 生产环境建议开启 Checkpointing
        // env.enableCheckpointing(60000); 
        // env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 2. 配置 Kafka Source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
                .setTopics("user-events-topic")
                .setGroupId("flink-graph-processor-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        
        DataStream<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 3. JSON 解析与转换
        Gson gson = new Gson();
        DataStream<InputEvent> eventStream = kafkaStream.process(new ProcessFunction<String, InputEvent>() {
            @Override
            public void processElement(String value, Context ctx, Collector<InputEvent> out) {
                try {
                    InputEvent event = gson.fromJson(value, InputEvent.class);
                    if (event != null && event.getEventId() != null) {
                        out.collect(event);
                    }
                } catch (Exception e) {
                    // 记录脏数据,而不是让作业失败
                    // getRuntimeContext().getMetricGroup().counter("dirtyRecords").inc();
                }
            }
        });

        // 4. 应用图处理逻辑
        DataStream<GraphUpdate> graphUpdateStream = eventStream.process(new EventToGraphProcessor());

        // 5. 写入 DynamoDB
        graphUpdateStream.addSink(new DynamoDbBatchSink("GraphFeatureStore", 25, 1000));

        // 6. 执行作业
        env.execute("Real-time Graph Feature Engineering Job");
    }
}

架构的扩展性与局限性

这个架构的扩展点非常清晰。当需要计算新的图特征时,例如“某个IP地址在过去一小时内关联的用户数”,我们可以新增一个 KeyedProcessFunction 算子。这个算子以 ipAddress 为 key,使用 Flink 的窗口 API 和状态来执行聚合计算,然后将结果(一个聚合特征)同样写入 DynamoDB 的 METADATA 行。

然而,其局限性也同样明显。我们构建的不是一个通用的图数据库,而是一个为特定查询模式高度优化的“图特征物化视图”。如果业务方突然提出一个需要深度遍历或全局扫描的临时分析需求,这个架构将无能为力。这种需求更适合交由离线的数据仓库(如 Snowflake, BigQuery)或一个专用的 OLAP 图分析引擎来处理。

另一个潜在的挑战是 Flink 的状态管理。随着图的规模增长,尤其是在节点度极不均衡(存在超级节点)的情况下,Flink 算子的状态可能会变得非常庞大,对 RocksDB 状态后端的性能和 Checkpoint 的时间造成压力。对此,我们需要设计合理的 TTL 策略来自动清理过期的状态和边,并对超级节点的处理逻辑进行特殊优化,比如采样或拆分。


  目录