Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent-flow/src/components/base/jadeNode.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ export const jadeNode = (id, x, y, width, height, parent, drawer) => {
* @returns {number} 连接数。
*/
self.maxNumToLink = () => {
return 1;
return self.graph?.connectionLimitDisabled ? 100 : 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image 这里量级保持一致,100->10

};

/**
Expand Down
4 changes: 3 additions & 1 deletion agent-flow/src/components/base/validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export class NormalNodeConnectorValidator extends Validator {
validate() {
const nextEvents = this.node.getNextRunnableEvents();
const i18n = this.node.graph.i18n;
if (nextEvents.length !== 1) {
const isConnectionLimitDisabled = Boolean(this.node.graph?.connectionLimitDisabled);
const isValid = isConnectionLimitDisabled ? nextEvents.length >= 1 : nextEvents.length === 1;
if (!isValid) {
return Promise.reject({
errorFields: [{
errors: [`${i18n?.t('node') ?? 'node'} ${this.node.text} ${i18n?.t('problemWithConnection') ?? 'problemWithConnection'}`],
Expand Down
2 changes: 1 addition & 1 deletion agent-flow/src/components/code/codeNodeState.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export const codeNodeState = (id, x, y, width, height, parent, drawer) => {
* @override
*/
self.maxNumToLink = () => {
return 10;
return self.graph?.connectionLimitDisabled ? 100 : 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是聚合连线的数量,当前可以默认为10个,先不要放到100这个量级

};

return self;
Expand Down
10 changes: 8 additions & 2 deletions agent-flow/src/flow/jadeFlowEntry.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ const jadeFlowAgent = (graph) => {
graph.destroy();
};

self.setConnectionLimitDisabled = (disabled) => {
graph.connectionLimitDisabled = Boolean(disabled);
};

return self;
};

Expand Down Expand Up @@ -432,6 +436,7 @@ export const JadeFlow = (() => {
div,
tenant,
appId,
connectionLimitDisabled = false,
flowConfigData,
configs,
i18n,
Expand All @@ -440,7 +445,7 @@ export const JadeFlow = (() => {
}) => {
const graphDom = getGraphDom(div);
const g = jadeFlowGraph(div, 'jadeFlow');
await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements);
await configGraph(g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled);
g.flowType = flowType;
const pageData = g.getPageData(0);
await g.editFlow(0, graphDom, pageData.id);
Expand Down Expand Up @@ -470,8 +475,9 @@ export const JadeFlow = (() => {
return jadeFlowAgent(g);
};

const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements) => {
const configGraph = async (g, tenant, appId, flowConfigData, configs, i18n, importStatements, connectionLimitDisabled = false) => {
g.collaboration.mute = true;
g.connectionLimitDisabled = Boolean(connectionLimitDisabled);
g.configs = configs;
g.i18n = i18n;
for (let i = 0; i < importStatements.length; i++) {
Expand Down
1 change: 1 addition & 0 deletions agent-flow/src/flow/jadeFlowGraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export const jadeFlowGraph = (div, title) => {
const self = defaultGraph(div, title);
self.type = 'jadeFlowGraph';
self.pageType = 'jadeFlowPage';
self.connectionLimitDisabled = false;
self.enableText = false;
self.flowMeta = {
exceptionFitables: ['modelengine.fit.jober.aipp.fitable.AippFlowExceptionHandler'],
Expand Down
8 changes: 6 additions & 2 deletions agent-flow/src/flow/jadeFlowPage.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export const jadeFlowPage = (div, graph, name, id) => {
self.addEventListener('COPY_SHAPE', shapeChangeListener);
self.addEventListener('DELETE_SHAPE', shapeChangeListener);

const isConnectionLimitDisabled = () => Boolean(self.graph?.connectionLimitDisabled);

/**
* @override
*/
Expand Down Expand Up @@ -305,7 +307,7 @@ export const jadeFlowPage = (div, graph, name, id) => {
*/
self.canDragOut = (node, connector) => {
const lines = self.getEvents().filter(s => s.fromShape === node.id && s.getDefinedFromConnector() === connector);
return lines && lines.length < 1;
return lines.length < (isConnectionLimitDisabled() ? 10 : 1);
};

/**
Expand All @@ -330,7 +332,9 @@ export const jadeFlowPage = (div, graph, name, id) => {
}
};

return jadeEvent.fromShape !== jadeEvent.toShape && isConnectorAllowToLink() && isConnectorWithinLimit();
return jadeEvent.fromShape !== jadeEvent.toShape
&& isConnectorAllowToLink()
&& (isConnectionLimitDisabled() || isConnectorWithinLimit());
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,29 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
return data.stream().map(d -> this.generate(d, position, LocalDateTime.now())).collect(Collectors.toList());
}

/**
* fork一个新的context用于一拖多分支,继承当前context的运行元数据,但生成新的contextId。
*
* @return 新的分支context
*/
public FlowContext<T> fork() {
return this.convertData(this.data);
}

/**
* convertData
*
* @param <R> 转换后的数据类型
* @param data 转换后的数据
* @return 转换后的context
*/
public <R> FlowContext<R> convertData(R data) {
FlowContext<R> context = this.copyContextWithoutID(data);
context.previous = this.previous;
context.nextPositionId = this.nextPositionId;
return context;
}

/**
* 用于when.convert数据时候的转换context,除了包裹的数据类型不一样,所有其他信息都一样
*
Expand All @@ -274,12 +297,17 @@ public <R> List<FlowContext<R>> generate(List<R> data, String position) {
* @return 转换后的context
*/
public <R> FlowContext<R> convertData(R data, String id) {
FlowContext<R> context = this.copyContextWithoutID(data);
context.previous = this.previous;
Copy link
Copy Markdown
Contributor

@loveTsong loveTsong Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看下copyContext整体调整后,这里应该不再需要为previous赋值

context.id = id;
return context;
}

private <R> FlowContext<R> copyContextWithoutID(R data) {
FlowContext<R> context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
this.parallel, this.parallelMode, LocalDateTime.now());
context.previous = this.previous;
context.status = this.status;
context.trans = this.trans;
context.id = id;
Copy link
Copy Markdown
Contributor

@loveTsong loveTsong Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果是copyContext的含义,不应该删除previous和id的赋值

context.batchId = this.batchId;
context.toBatch = this.toBatch;
context.createAt = this.createAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ default void update(List<FlowContext<T>> contexts) {
}

/**
* updateToSent
* 更新context状态为SENT
*
* @param contexts contexts
* @param contexts 上下文列表
*/
void updateToSent(List<FlowContext<T>> contexts);

/**
* updateToReady
* 更新context状态为READY
*
* @param contexts contexts
* @param contexts 上下文列表
*/
void updateToReady(List<FlowContext<T>> contexts);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public FitStream.Publisher<FlowData> convertToFlow(FlowContextRepo<FlowData> rep
// startNode不能出现在event的to属性, endNode不能出现在event的from属性
FlowNode toNode = nodeMap.get(event.getTo());
fromNode.subscribe(streamId, flowEnv, toNode, event);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image 多余的空行

});
});
return getFlowNode(FlowNodeType.START).getPublisher(streamId, repo, messenger, locks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class FlowForkNode extends FlowNode {
public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, FlowContextRepo<FlowData> repo,
FlowContextMessenger messenger, FlowLocks locks) {
if (!Optional.ofNullable(processor).isPresent()) {
this.processor = new Node<>(streamId, this.metaId, this::forkJuster, repo, messenger, locks, this.type);
this.processor = new Node<>(streamId, this.metaId, this::forkJuster, repo, messenger, locks, this.type,
FlowData.class);
this.processor.onError(errorHandler(streamId));
}
return this.processor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.FitStream;
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Blocks;
import modelengine.fit.waterflow.flowsengine.domain.flows.streams.nodes.Node;
import modelengine.fit.waterflow.flowsengine.utils.FlowUtil;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.util.CollectionUtils;
import modelengine.fitframework.util.ObjectUtils;
import modelengine.fitframework.util.StringUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -54,10 +59,11 @@ public class FlowStateNode extends FlowNode {
*/
@Override
public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, FlowContextRepo<FlowData> repo,
FlowContextMessenger messenger, FlowLocks locks) {
FlowContextMessenger messenger, FlowLocks locks) {
if (!Optional.ofNullable(this.processor).isPresent()) {
Node<FlowData, FlowData> node = new Node<>(streamId, this.metaId, this::stateProduce, repo, messenger,
locks, this.type);
locks, this.type, FlowData.class);

if (!Objects.isNull(this.jober)) {
node.setIsAsyncJob(this.jober.isAsync());
}
Expand All @@ -74,6 +80,50 @@ public FitStream.Processor<FlowData, FlowData> getProcessor(String streamId, Flo
return this.processor;
}

private List<FlowContext<FlowData>> mergeProcessInputs(List<FlowContext<FlowData>> pre) {
if (CollectionUtils.isEmpty(pre) || pre.size() <= 1) {
return pre;
}
if (pre.stream().anyMatch(context -> !(context.getData() instanceof FlowData))) {
return pre;
}
if (pre.stream().map(FlowContext::getPosition).filter(StringUtils::isNotEmpty).distinct().count() <= 1) {
return pre;
}
FlowContext<FlowData> baseContext = pre.get(0);
FlowData mergedFlowData = mergeFlowData(pre, baseContext.getId());
return Collections.singletonList(
baseContext.convertData(ObjectUtils.cast(mergedFlowData), baseContext.getId()));
}

private FlowData mergeFlowData(List<FlowContext<FlowData>> pre, String baseContextId) {
FlowData first = pre.get(0).getData();
Map<String, Object> businessData = new HashMap<>(
Optional.ofNullable(first.getBusinessData()).orElseGet(HashMap::new));
Map<String, Object> contextData = new HashMap<>(
Optional.ofNullable(first.getContextData()).orElseGet(HashMap::new));
Map<String, Object> passData = new HashMap<>(Optional.ofNullable(first.getPassData()).orElseGet(HashMap::new));

pre.stream().skip(1).map(FlowContext::getData).forEach(flowData -> {
businessData.putAll(FlowUtil.mergeMaps(businessData,
Optional.ofNullable(flowData.getBusinessData()).orElseGet(HashMap::new)));
contextData.putAll(FlowUtil.mergeMaps(contextData,
Optional.ofNullable(flowData.getContextData()).orElseGet(HashMap::new)));
passData.putAll(FlowUtil.mergeMaps(passData,
Optional.ofNullable(flowData.getPassData()).orElseGet(HashMap::new)));
});
contextData.put(Constant.CONTEXT_ID, baseContextId);
return FlowData.builder()
.operator(first.getOperator())
.startTime(first.getStartTime())
.businessData(businessData)
.contextData(contextData)
.passData(passData)
.errorMessage(first.getErrorMessage())
.errorInfo(first.getErrorInfo())
.build();
}

private List<FlowData> stateProduce(List<FlowContext<FlowData>> inputs) {
addContextData(inputs);
List<FlowData> flowDataList = inputs.stream().map(FlowContext::getData).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fit.waterflow.flowsengine.domain.flows.streams;

import modelengine.fit.waterflow.flowsengine.domain.flows.context.FlowContext;
import modelengine.fit.waterflow.flowsengine.domain.flows.context.FlowData;
import modelengine.fit.waterflow.flowsengine.utils.FlowUtil;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* FlowData 类型数据的多输入合并器
* 用于 fan-in 场景下将多条 FlowData 输入合并为单条处理
*
* @author 高诗意
* @since 2023/08/14
*/
public class FlowDataMerger implements Processors.Merger<FlowData> {

@Override
public FlowContext<FlowData> merge(List<FlowContext<FlowData>> contexts) {
if (contexts == null || contexts.isEmpty()) {
return null;
}
FlowContext<FlowData> baseContext = contexts.get(0);
FlowData mergedFlowData = mergeFlowData(contexts);
return baseContext.convertData(mergedFlowData, baseContext.getId());
}

private FlowData mergeFlowData(List<FlowContext<FlowData>> contexts) {
FlowData first = contexts.get(0).getData();
Map<String, Object> businessData = new HashMap<>(
Optional.ofNullable(first.getBusinessData()).orElseGet(HashMap::new));
Map<String, Object> contextData = new HashMap<>(
Optional.ofNullable(first.getContextData()).orElseGet(HashMap::new));
Map<String, Object> passData = new HashMap<>(
Optional.ofNullable(first.getPassData()).orElseGet(HashMap::new));

contexts.stream().skip(1).map(FlowContext::getData).forEach(flowData -> {
businessData.putAll(FlowUtil.mergeMaps(businessData,
Optional.ofNullable(flowData.getBusinessData()).orElseGet(HashMap::new)));
contextData.putAll(FlowUtil.mergeMaps(contextData,
Optional.ofNullable(flowData.getContextData()).orElseGet(HashMap::new)));
passData.putAll(FlowUtil.mergeMaps(passData,
Optional.ofNullable(flowData.getPassData()).orElseGet(HashMap::new)));
});
return FlowData.builder()
.operator(first.getOperator())
.startTime(first.getStartTime())
.businessData(businessData)
.contextData(contextData)
.passData(passData)
.errorMessage(first.getErrorMessage())
.errorInfo(first.getErrorInfo())
.build();
}
}
Loading
Loading