5454import java .util .List ;
5555import java .util .Map ;
5656import java .util .Objects ;
57+ import java .util .concurrent .locks .Lock ;
58+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
59+ import java .util .function .Supplier ;
5760import java .util .stream .Collectors ;
5861import java .util .stream .Stream ;
5962
@@ -73,20 +76,18 @@ public final class CompiledPipeline {
7376 */
7477 private final EventCondition .Compiler conditionalCompiler = new EventCondition .Compiler ();
7578
76- /**
77- * Configured inputs.
78- */
79- private final Collection <IRubyObject > inputs ;
79+ private final InputStage inputStage ;
8080
81- /**
82- * Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}.
83- */
84- private final Map <String , AbstractFilterDelegatorExt > filters ;
81+ private final WorkerStage workerStage ;
8582
86- /**
87- * Configured outputs.
88- */
89- private final Map <String , AbstractOutputDelegatorExt > outputs ;
83+ private final Lock workerStageReadLock ;
84+ private final Lock workerStageWriteLock ;
85+
86+ {
87+ final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
88+ workerStageReadLock = readWriteLock .readLock ();
89+ workerStageWriteLock = readWriteLock .writeLock ();
90+ }
9091
9192 /**
9293 * Parsed pipeline configuration graph.
@@ -111,6 +112,50 @@ public void notify(ConditionalEvaluationError err) {
111112 }
112113 }
113114
115+ public class InputStage {
116+ final Collection <IRubyObject > inputs ;
117+
118+ InputStage (final ConfigVariableExpander configVariableExpander ) {
119+ try {
120+ this .inputs = setupInputs (configVariableExpander );
121+ } catch (final Exception e ) {
122+ throw new IllegalStateException ("Unable to configure plugins for input stage: " + e .getMessage (), e );
123+ }
124+ }
125+ }
126+
127+ public class WorkerStage {
128+ final Map <String , AbstractFilterDelegatorExt > filters ;
129+ final Map <String , AbstractOutputDelegatorExt > outputs ;
130+
131+ WorkerStage (final ConfigVariableExpander configVariableExpander ) {
132+ try {
133+ this .filters = Map .copyOf (setupFilters (configVariableExpander ));
134+ this .outputs = Map .copyOf (setupOutputs (configVariableExpander ));
135+ } catch (final Exception e ) {
136+ throw new IllegalStateException ("Unable to configure plugins for worker stage: " + e .getMessage (), e );
137+ }
138+ }
139+
140+ public Collection <AbstractOutputDelegatorExt > getOutputs () {
141+ return outputs .values ();
142+ }
143+
144+ public Collection <AbstractFilterDelegatorExt > getFilters () {
145+ return filters .values ();
146+ }
147+
148+ public CompiledExecution buildExecution () {
149+ return buildExecution (false );
150+ }
151+
152+ public CompiledExecution buildExecution (final boolean orderedExecution ) {
153+ return orderedExecution
154+ ? new CompiledPipeline .CompiledOrderedExecution (this )
155+ : new CompiledPipeline .CompiledUnorderedExecution (this );
156+ }
157+ }
158+
114159 public CompiledPipeline (
115160 final PipelineIR pipelineIR ,
116161 final RubyIntegration .PluginFactory pluginFactory )
@@ -130,24 +175,36 @@ public CompiledPipeline(
130175 try (ConfigVariableExpander cve = new ConfigVariableExpander (
131176 secretStore ,
132177 EnvironmentVariableProvider .defaultProvider ())) {
133- inputs = setupInputs (cve );
134- filters = setupFilters (cve );
135- outputs = setupOutputs (cve );
178+ this .inputStage = new InputStage (cve );
179+ this .workerStage = withLock (workerStageWriteLock , () -> new WorkerStage (cve ));
136180 } catch (Exception e ) {
137181 throw new IllegalStateException ("Unable to configure plugins: " + e .getMessage (), e );
138182 }
139183 }
140184
185+ public WorkerStage workerStage () {
186+ return withLock (workerStageReadLock , () -> workerStage );
187+ }
188+
189+ static <T > T withLock (final Lock lock , final Supplier <T > supplier ) {
190+ lock .lock ();
191+ try {
192+ return supplier .get ();
193+ } finally {
194+ lock .unlock ();
195+ }
196+ }
197+
141198 public Collection <AbstractOutputDelegatorExt > outputs () {
142- return Collections .unmodifiableCollection (outputs .values ());
199+ return withLock ( workerStageReadLock , () -> Collections .unmodifiableCollection (this . workerStage . outputs .values () ));
143200 }
144201
145202 public Collection <AbstractFilterDelegatorExt > filters () {
146- return Collections .unmodifiableCollection (filters .values ());
203+ return withLock ( workerStageReadLock , () -> Collections .unmodifiableCollection (this . workerStage . filters .values () ));
147204 }
148205
149206 public Collection <IRubyObject > inputs () {
150- return Collections .unmodifiableCollection (inputs );
207+ return Collections .unmodifiableCollection (this . inputStage . inputs );
151208 }
152209
153210 /**
@@ -156,6 +213,7 @@ public Collection<IRubyObject> inputs() {
156213 * unordered execution model.
157214 * @return CompiledPipeline.CompiledExecution the compiled pipeline
158215 */
216+ @ Deprecated // use WorkerStage#buildExecution
159217 public CompiledPipeline .CompiledExecution buildExecution () {
160218 return buildExecution (false );
161219 }
@@ -167,10 +225,11 @@ public CompiledPipeline.CompiledExecution buildExecution() {
167225 * @param orderedExecution determines whether to build an execution that enforces order or not
168226 * @return CompiledPipeline.CompiledExecution the compiled pipeline
169227 */
228+ @ Deprecated // use WorkerStage#buildExecution
170229 public CompiledPipeline .CompiledExecution buildExecution (boolean orderedExecution ) {
171230 return orderedExecution
172- ? new CompiledPipeline .CompiledOrderedExecution ()
173- : new CompiledPipeline .CompiledUnorderedExecution ();
231+ ? new CompiledPipeline .CompiledOrderedExecution (this . workerStage )
232+ : new CompiledPipeline .CompiledUnorderedExecution (this . workerStage );
174233 }
175234
176235 /**
@@ -296,28 +355,14 @@ public static Object expandConfigVariableKeepingSecrets(ConfigVariableExpander c
296355 return expandConfigVariable (cve , valueToExpand , true );
297356 }
298357
299- /**
300- * Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}.
301- * @param vertex Vertex to check
302- * @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt}
303- */
304- private boolean isFilter (final Vertex vertex ) {
305- return filters .containsKey (vertex .getId ());
306- }
307-
308- /**
309- * Checks if a certain {@link Vertex} represents an output.
310- * @param vertex Vertex to check
311- * @return True iff {@link Vertex} represents an output
312- */
313- private boolean isOutput (final Vertex vertex ) {
314- return outputs .containsKey (vertex .getId ());
315- }
316-
317358 public final class CompiledOrderedExecution extends CompiledExecution {
318359
319360 @ SuppressWarnings ({"unchecked" }) private final RubyArray <RubyEvent > EMPTY_ARRAY = RubyUtil .RUBY .newEmptyArray ();
320361
362+ CompiledOrderedExecution (WorkerStage workerStage ) {
363+ super (workerStage );
364+ }
365+
321366 @ Override
322367 public int compute (final QueueBatch batch , final boolean flush , final boolean shutdown ) {
323368 return compute (batch .events (), flush , shutdown );
@@ -353,6 +398,10 @@ private void _compute(final RubyArray<RubyEvent> batch, final RubyArray<RubyEven
353398
354399 public final class CompiledUnorderedExecution extends CompiledExecution {
355400
401+ CompiledUnorderedExecution (WorkerStage workerStage ) {
402+ super (workerStage );
403+ }
404+
356405 @ Override
357406 public int compute (final QueueBatch batch , final boolean flush , final boolean shutdown ) {
358407 return compute (batch .events (), flush , shutdown );
@@ -396,10 +445,13 @@ public abstract class CompiledExecution implements Execution<QueueBatch> {
396445 */
397446 private final Map <String , Dataset > plugins = new HashMap <>(50 );
398447
448+ private final WorkerStage workerStage ;
449+
399450 protected final Dataset compiledFilters ;
400451 protected final Dataset compiledOutputs ;
401452
402- CompiledExecution () {
453+ CompiledExecution (final WorkerStage workerStage ) {
454+ this .workerStage = workerStage ;
403455 compiledFilters = compileFilters ();
404456 compiledOutputs = compileOutputs ();
405457 }
@@ -425,7 +477,7 @@ private Dataset compileFilters() {
425477 */
426478 private Dataset compileOutputs () {
427479 final Collection <Vertex > outputNodes = pipelineIR .getGraph ()
428- .allLeaves ().filter (CompiledPipeline . this ::isOutput )
480+ .allLeaves ().filter (this ::isOutput )
429481 .collect (Collectors .toList ());
430482 if (outputNodes .isEmpty ()) {
431483 return Dataset .IDENTITY ;
@@ -450,7 +502,7 @@ private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> dat
450502 final ComputeStepSyntaxElement <Dataset > prepared =
451503 DatasetCompiler .filterDataset (
452504 flatten (datasets , vertex ),
453- filters .get (vertexId )
505+ workerStage . filters .get (vertexId )
454506 );
455507
456508 plugins .put (vertexId , prepared .instantiate ());
@@ -473,8 +525,8 @@ private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> dat
473525 final ComputeStepSyntaxElement <Dataset > prepared =
474526 DatasetCompiler .outputDataset (
475527 flatten (datasets , vertex ),
476- outputs .get (vertexId ),
477- outputs .size () == 1
528+ workerStage . outputs .get (vertexId ),
529+ workerStage . outputs .size () == 1
478530 );
479531
480532 plugins .put (vertexId , prepared .instantiate ());
@@ -576,5 +628,23 @@ private Collection<Dataset> compileDependencies(
576628 }
577629 ).collect (Collectors .toList ());
578630 }
631+
632+ /**
633+ * Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}.
634+ * @param vertex Vertex to check
635+ * @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt}
636+ */
637+ private boolean isFilter (final Vertex vertex ) {
638+ return this .workerStage .filters .containsKey (vertex .getId ());
639+ }
640+
641+ /**
642+ * Checks if a certain {@link Vertex} represents an output.
643+ * @param vertex Vertex to check
644+ * @return True iff {@link Vertex} represents an output
645+ */
646+ private boolean isOutput (final Vertex vertex ) {
647+ return this .workerStage .outputs .containsKey (vertex .getId ());
648+ }
579649 }
580650}
0 commit comments