一个棘手的生产需求摆在了面前:我们需要为核心风控系统提供近乎实时的关联关系特征。例如,在用户发起一笔交易时,系统必须在50毫秒内判断该用户的设备、IP地址是否与已知的欺诈账户存在两度以内的关联。传统的做法是使用离线图计算(如Spark GraphX),每天生成图特征快照,但这无法捕捉到几分钟内新形成的欺诈团伙。数据延迟是致命的。
这就引出了一个核心的技术挑战:如何构建一个能够每秒处理数十万事件、动态更新图结构,并能为在线服务提供毫秒级图特征查询的系统。
定义复杂技术问题
经过梳理,我们的目标系统必须满足以下几个硬性指标:
- 极低查询延迟: 特征服务接口的 P99 延迟必须低于 20ms。这是在线同步调用的红线。
- 高吞吐事件流: 系统需要稳定消费上游的 Kafka topic,峰值流量可达 200,000 events/sec。事件类型包括用户登录、交易、设备信息变更等。
- 动态图更新: 图谱结构不是静态的。新的用户、设备会作为节点加入,用户与设备、IP之间的关系会作为边动态创建。
- 成本可控与运维简化: 团队规模有限,我们无法接受一个需要专人长期维护、调优的复杂数据库集群。方案必须倾向于利用托管服务和Serverless理念。
方案A:专用图数据库(如 Amazon Neptune)
第一个进入视野的自然是专用的图数据库。架构会是这样的:
graph TD A[Kafka Event Stream] --> B{Apache Flink Job}; B -- Simple Enrichment & Formatting --> C[Amazon Neptune]; D[Online Service] -- Gremlin/SPARQL Query --> C;
优势分析:
- 强大的查询能力: Neptune 原生支持 Gremlin 和 SPARQL 查询语言,对于复杂的、任意深度的图遍历、路径查找、模式匹配等查询,表现非常出色。我们可以轻松实现“查找N度关系内的所有欺诈节点”这类需求。
- 功能成熟: 作为托管服务,它解决了备份、恢复、补丁等运维难题。
劣势分析:
- 写入性能瓶颈: 图数据库为了维护复杂的索引和事务一致性,其写入性能通常不是强项。在高吞吐的流式写入场景下,很容易达到写入瓶颈或产生热点问题。
- 查询延迟不确定性: 虽然简单查询很快,但复杂 Gremlin 查询的性能可能会随着图的复杂度和数据规模增长而变得不可预测。对于我们要求 P99 低于 20ms 的硬性指标,这是一个巨大的风险。
- 成本与运维: Neptune 是基于实例付费的,为了应对峰值流量,我们需要预置相当规模的写入器和读取器实例,大部分时间资源可能是闲置的,成本较高。尽管是托管服务,其参数调优和性能监控仍然需要专业知识。
在真实项目中,为了保证核心交易链路的稳定性,不可预测的延迟是大忌。方案A的查询灵活性很诱人,但其性能不确定性和成本让我们望而却步。
方案B:Flink 有状态计算 + DynamoDB KV 存储
这个方案转变了思路:将图的计算与存储分离。让 Flink 负责所有复杂的图计算和状态维护,而将一个高度优化的键值数据库用作查询服务层。
graph TD subgraph Real-time Computation Layer A[Kafka Event Stream] --> B{Flink Stateful Job: Graph Processor}; end subgraph Low-Latency Serving Layer C[DynamoDB Table: Adjacency List Model]; end subgraph Application Layer D[Online Service]; end B -- Pre-computed Features & Graph Edges --> C; D -- Simple GetItem/Query --> C;
优势分析:
- 极致的查询性能: DynamoDB 是一个为可预测的、低延迟读写而设计的数据库。通过精心设计的数据模型(如下文将详述的邻接表模式),我们可以将复杂的图遍历转化为对 DynamoDB 的若干次
GetItem
或Query
操作,P99 延迟稳定在个位数毫秒是完全可行的。 - 水平扩展的写入能力: Flink 和 DynamoDB 都是为水平扩展而生。Flink 的并行度可以根据 Kafka 分区数动态调整,而 DynamoDB 的写入容量可以按需自动伸缩。这完美匹配我们高吞吐的写入需求。
- 成本效益与Serverless: Flink 可以部署在 K8s 或 Flink on Kinesis Data Analytics 上,实现资源弹性。DynamoDB 按实际使用量付费,没有闲置成本。整个架构的运维负担极小。
劣势分析:
- 查询能力受限: 这是最大的妥协。我们无法执行任意的、即席的图查询。所有需要查询的特征(如图深度、节点属性聚合)都必须预先在 Flink 作业中计算好,并物化到 DynamoDB。图的“逻辑”被硬编码在了 Flink 应用代码中。
- 应用逻辑复杂: 图的构建、更新、过期等所有逻辑都需要在 Flink 的有状态算子中自行实现。这对开发人员的 Flink 编程能力和状态管理经验提出了更高的要求。
最终选择与理由
我们最终选择了方案B。
决策的核心理由是:为核心在线业务的性能确定性负责。风控场景下,99.9% 的查询都是固定的几种模式,例如“查询一度、二度关联节点”。为了这几种高频、低延迟的查询,牺牲掉几乎用不上的任意图分析能力,是一个完全合理的工程权衡。方案B将复杂性从不可控的数据库性能转移到了可控的、可测试的 Flink 应用代码中。这让我们对系统的最终性能表现更有信心。
核心实现概览
1. DynamoDB 的邻接表(Adjacency List)数据模型
这是将图结构存储在非图数据库中的关键。我们设计一个单独的表,使用复合主键来同时存储节点和边。
- 表名:
GraphFeatureStore
- 主键:
-
PK
(Partition Key): 节点 ID,例如USER#123
或DEVICE#ABC
-
SK
(Sort Key): 描述与该节点相关的关系或属性。
-
- GSI (Global Secondary Index):
- Index Name:
inverted-index
- 主键:
SK
(Partition Key),PK
(Sort Key) - 用于反向查询,例如“查找所有关联到此设备的账户”。
- Index Name:
数据示例:
PK | SK | Type | Timestamp | FraudScore |
---|---|---|---|---|
USER#123 |
METADATA#USER |
UserNode |
1667889900 |
0.1 |
USER#123 |
DEVICE#ABC |
HasDevice |
1667889910 |
- |
USER#123 |
IP#1.2.3.4 |
HasIp |
1667889920 |
- |
DEVICE#ABC |
METADATA#DEVICE |
DeviceNode |
1667889910 |
0.8 |
DEVICE#ABC |
USER#123 |
UsedByUser |
1667889910 |
- |
通过这个模型:
- 查询用户123的所有关联(一度关系):
Query
onGraphFeatureStore
wherePK = "USER#123"
。 - 查询设备ABC被哪些用户使用:
Query
oninverted-index
wherePK = "DEVICE#ABC"
(在索引中,PK是原表的SK)。
2. Flink 作业实现
我们将使用 Java 来实现 Flink 作业。
2.1 输入事件与图元素定义
首先,定义清晰的数据结构。
// src/main/java/com/mycorp/flink/model/InputEvent.java
package com.mycorp.flink.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
// 假设这是从 Kafka 中消费的 JSON 事件反序列化后的对象
@Data
@NoArgsConstructor
@AllArgsConstructor
public class InputEvent {
private String eventId;
private String eventType; // e.g., "USER_LOGIN"
private long eventTimestamp;
private String userId;
private String deviceId;
private String ipAddress;
}
// src/main/java/com/mycorp/flink/model/GraphUpdate.java
package com.mycorp.flink.model;
import lombok.Data;
import lombok.Builder;
// Flink 作业内部流动的数据,表示一个图的更新操作
@Data
@Builder
public class GraphUpdate {
// DynamoDB Table's PK
private String pk;
// DynamoDB Table's SK
private String sk;
// e.g., UserNode, DeviceNode, HasDeviceEdge
private String type;
private long timestamp;
// Flag to indicate if this is a deletion
private boolean isDelete = false;
// ... 其他需要更新的属性
}
2.2 核心图处理逻辑:KeyedProcessFunction
这是整个系统的“大脑”。它消费事件流,将其转换为图的更新,并向下游发送。
// src/main/java/com/mycorp/flink/operator/EventToGraphProcessor.java
package com.mycorp.flink.operator;
import com.mycorp.flink.model.InputEvent;
import com.mycorp.flink.model.GraphUpdate;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventToGraphProcessor extends ProcessFunction<InputEvent, GraphUpdate> {
private static final Logger LOG = LoggerFactory.getLogger(EventToGraphProcessor.class);
@Override
public void processElement(InputEvent event, Context ctx, Collector<GraphUpdate> out) throws Exception {
LOG.debug("Processing event: {}", event.getEventId());
if (event.getUserId() == null || (event.getDeviceId() == null && event.getIpAddress() == null)) {
LOG.warn("Skipping event with missing critical identifiers: {}", event.getEventId());
return;
}
String userPk = "USER#" + event.getUserId();
// 1. 发出用户节点更新 (即使节点已存在,这也可以用于更新时间戳等属性)
out.collect(GraphUpdate.builder()
.pk(userPk)
.sk("METADATA#USER")
.type("UserNode")
.timestamp(event.getEventTimestamp())
.build());
// 2. 处理与设备的关联
if (event.getDeviceId() != null) {
String devicePk = "DEVICE#" + event.getDeviceId();
// 发出设备节点更新
out.collect(GraphUpdate.builder()
.pk(devicePk)
.sk("METADATA#DEVICE")
.type("DeviceNode")
.timestamp(event.getEventTimestamp())
.build());
// 创建正向边: User -> Device
out.collect(GraphUpdate.builder()
.pk(userPk)
.sk(devicePk)
.type("HasDevice")
.timestamp(event.getEventTimestamp())
.build());
// 创建反向边: Device -> User (用于GSI查询)
out.collect(GraphUpdate.builder()
.pk(devicePk)
.sk(userPk)
.type("UsedByUser")
.timestamp(event.getEventTimestamp())
.build());
}
// 3. 处理与IP的关联 (逻辑类似)
if (event.getIpAddress() != null) {
String ipPk = "IP#" + event.getIpAddress();
out.collect(GraphUpdate.builder().pk(ipPk).sk("METADATA#IP").type("IpNode").timestamp(event.getEventTimestamp()).build());
out.collect(GraphUpdate.builder().pk(userPk).sk(ipPk).type("HasIp").timestamp(event.getEventTimestamp()).build());
out.collect(GraphUpdate.builder().pk(ipPk).sk(userPk).type("UsedByUser").timestamp(event.getEventTimestamp()).build());
}
}
}
这里的 ProcessFunction
是无状态的,因为它只是简单地将每个事件转换为一组图更新操作。在更复杂的场景中,例如计算节点的度(degree)或执行窗口聚合,我们会使用 KeyedProcessFunction
并利用 Flink 的状态后端(ValueState
, MapState
)来存储中间结果。
2.3 自定义 DynamoDB Sink
虽然 Flink 社区提供了一些通用的异步IO工具和连接器,但对于 DynamoDB 这种需要精细化控制(如批量写入、错误重试、吞吐量控制)的场景,自定义一个 RichSinkFunction
往往能获得更好的性能和稳定性。
// src/main/java/com/mycorp/flink/sink/DynamoDbBatchSink.java
package com.mycorp.flink.sink;
import com.mycorp.flink.model.GraphUpdate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DynamoDbBatchSink extends RichSinkFunction<GraphUpdate> {
private static final Logger LOG = LoggerFactory.getLogger(DynamoDbBatchSink.class);
private final String tableName;
private final int batchSize;
private final long batchIntervalMs;
private transient DynamoDbClient dynamoDbClient;
private transient List<WriteRequest> buffer;
private transient long lastFlushTime;
public DynamoDbBatchSink(String tableName, int batchSize, long batchIntervalMs) {
this.tableName = tableName;
this.batchSize = batchSize;
this.batchIntervalMs = batchIntervalMs;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.buffer = new ArrayList<>();
this.lastFlushTime = System.currentTimeMillis();
// 生产级的 SDK 配置:包含重试策略和超时
RetryPolicy retryPolicy = RetryPolicy.builder()
.numRetries(5)
.backoffStrategy(EqualJitterBackoffStrategy.builder()
.baseDelay(Duration.ofMillis(100))
.maxBackoffTime(Duration.ofSeconds(2))
.build())
.build();
ClientOverrideConfiguration clientConfig = ClientOverrideConfiguration.builder()
.apiCallAttemptTimeout(Duration.ofSeconds(2))
.apiCallTimeout(Duration.ofSeconds(5))
.retryPolicy(retryPolicy)
.build();
this.dynamoDbClient = DynamoDbClient.builder()
.region(Region.AP_NORTHEAST_1) // 从配置中读取
.credentialsProvider(DefaultCredentialsProvider.create())
.overrideConfiguration(clientConfig)
.build();
}
@Override
public void invoke(GraphUpdate value, Context context) throws Exception {
// 将 GraphUpdate 对象转换为 DynamoDB 的 WriteRequest
Map<String, AttributeValue> item = new HashMap<>();
item.put("PK", AttributeValue.builder().s(value.getPk()).build());
item.put("SK", AttributeValue.builder().s(value.getSk()).build());
item.put("Type", AttributeValue.builder().s(value.getType()).build());
item.put("Timestamp", AttributeValue.builder().n(String.valueOf(value.getTimestamp())).build());
PutRequest putRequest = PutRequest.builder().item(item).build();
WriteRequest writeRequest = WriteRequest.builder().putRequest(putRequest).build();
buffer.add(writeRequest);
if (buffer.size() >= batchSize || System.currentTimeMillis() - lastFlushTime >= batchIntervalMs) {
flush();
}
}
@Override
public void close() throws Exception {
flush(); // 确保关闭前所有缓冲数据都被写入
if (dynamoDbClient != null) {
dynamoDbClient.close();
}
super.close();
}
private void flush() {
if (buffer.isEmpty()) {
return;
}
LOG.info("Flushing {} items to DynamoDB.", buffer.size());
try {
Map<String, List<WriteRequest>> requestItems = new HashMap<>();
requestItems.put(tableName, new ArrayList<>(buffer)); // 拷贝一份,避免并发修改
BatchWriteItemRequest request = BatchWriteItemRequest.builder()
.requestItems(requestItems)
.build();
// DynamoDB BatchWriteItem 有可能部分成功,生产代码需要处理未成功的item并重试
BatchWriteItemResponse response = dynamoDbClient.batchWriteItem(request);
if (response.hasUnprocessedItems() && !response.unprocessedItems().isEmpty()) {
LOG.error("Failed to write all items to DynamoDB. Unprocessed items count: {}",
response.unprocessedItems().get(tableName).size());
// 在真实项目中,这里必须有健壮的重试或告警逻辑,甚至可以将失败的请求发到死信队列
}
} catch (DynamoDbException e) {
LOG.error("Error writing batch to DynamoDB. Retries might be attempted by SDK.", e);
// 抛出异常可能导致 Flink 作业重启,这是一种可行的容错策略
throw new RuntimeException("Fatal error during DynamoDB flush", e);
} finally {
buffer.clear();
lastFlushTime = System.currentTimeMillis();
}
}
}
这个 Sink 的关键在于 flush
方法。它利用了 DynamoDB 的 BatchWriteItem
API,一次最多可以写入25个 item,这比单次 PutItem
效率高得多,也更节省成本。同时,我们实现了基于大小和时间的双重触发机制,确保数据既能及时写入,也能在高负载时利用批处理的优势。生产级的代码必须处理 unprocessedItems
,这是 DynamoDB 在限流等情况下返回的部分失败项。
2.4 组装 Flink 作业
最后,将所有组件串联起来。
// src/main/java/com/mycorp/flink/GraphFeatureJob.java
package com.mycorp.flink;
import com.mycorp.flink.model.InputEvent;
import com.mycorp.flink.operator.EventToGraphProcessor;
import com.mycorp.flink.sink.DynamoDbBatchSink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import com.google.gson.Gson;
public class GraphFeatureJob {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 生产环境建议开启 Checkpointing
// env.enableCheckpointing(60000);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 2. 配置 Kafka Source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
.setTopics("user-events-topic")
.setGroupId("flink-graph-processor-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. JSON 解析与转换
Gson gson = new Gson();
DataStream<InputEvent> eventStream = kafkaStream.process(new ProcessFunction<String, InputEvent>() {
@Override
public void processElement(String value, Context ctx, Collector<InputEvent> out) {
try {
InputEvent event = gson.fromJson(value, InputEvent.class);
if (event != null && event.getEventId() != null) {
out.collect(event);
}
} catch (Exception e) {
// 记录脏数据,而不是让作业失败
// getRuntimeContext().getMetricGroup().counter("dirtyRecords").inc();
}
}
});
// 4. 应用图处理逻辑
DataStream<GraphUpdate> graphUpdateStream = eventStream.process(new EventToGraphProcessor());
// 5. 写入 DynamoDB
graphUpdateStream.addSink(new DynamoDbBatchSink("GraphFeatureStore", 25, 1000));
// 6. 执行作业
env.execute("Real-time Graph Feature Engineering Job");
}
}
架构的扩展性与局限性
这个架构的扩展点非常清晰。当需要计算新的图特征时,例如“某个IP地址在过去一小时内关联的用户数”,我们可以新增一个 KeyedProcessFunction
算子。这个算子以 ipAddress
为 key,使用 Flink 的窗口 API 和状态来执行聚合计算,然后将结果(一个聚合特征)同样写入 DynamoDB 的 METADATA
行。
然而,其局限性也同样明显。我们构建的不是一个通用的图数据库,而是一个为特定查询模式高度优化的“图特征物化视图”。如果业务方突然提出一个需要深度遍历或全局扫描的临时分析需求,这个架构将无能为力。这种需求更适合交由离线的数据仓库(如 Snowflake, BigQuery)或一个专用的 OLAP 图分析引擎来处理。
另一个潜在的挑战是 Flink 的状态管理。随着图的规模增长,尤其是在节点度极不均衡(存在超级节点)的情况下,Flink 算子的状态可能会变得非常庞大,对 RocksDB 状态后端的性能和 Checkpoint 的时间造成压力。对此,我们需要设计合理的 TTL 策略来自动清理过期的状态和边,并对超级节点的处理逻辑进行特殊优化,比如采样或拆分。