5858import java .util .List ;
5959import java .util .Map ;
6060import java .util .Objects ;
61+ import java .util .concurrent .locks .Lock ;
62+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
63+ import java .util .function .Supplier ;
6164import java .util .stream .Collectors ;
6265import java .util .stream .Stream ;
6366
@@ -79,20 +82,18 @@ public final class CompiledPipeline {
7982 */
8083 private final EventCondition .Compiler conditionalCompiler = new EventCondition .Compiler ();
8184
82- /**
83- * Configured inputs.
84- */
85- private final Collection <IRubyObject > inputs ;
85+ private final InputStage inputStage ;
8686
87- /**
88- * Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}.
89- */
90- private final Map <String , AbstractFilterDelegatorExt > filters ;
87+ private final WorkerStage workerStage ;
9188
92- /**
93- * Configured outputs.
94- */
95- private final Map <String , AbstractOutputDelegatorExt > outputs ;
89+ private final Lock workerStageReadLock ;
90+ private final Lock workerStageWriteLock ;
91+
92+ {
93+ final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
94+ workerStageReadLock = readWriteLock .readLock ();
95+ workerStageWriteLock = readWriteLock .writeLock ();
96+ }
9697
9798 /**
9899 * Parsed pipeline configuration graph.
@@ -128,6 +129,50 @@ public void notify(ConditionalEvaluationError err) {
128129 }
129130 }
130131
132+ public class InputStage {
133+ final Collection <IRubyObject > inputs ;
134+
135+ InputStage (final ConfigVariableExpander configVariableExpander ) {
136+ try {
137+ this .inputs = setupInputs (configVariableExpander );
138+ } catch (final Exception e ) {
139+ throw new IllegalStateException ("Unable to configure plugins for input stage: " + e .getMessage (), e );
140+ }
141+ }
142+ }
143+
144+ public class WorkerStage {
145+ final Map <String , AbstractFilterDelegatorExt > filters ;
146+ final Map <String , AbstractOutputDelegatorExt > outputs ;
147+
148+ WorkerStage (final ConfigVariableExpander configVariableExpander ) {
149+ try {
150+ this .filters = Map .copyOf (setupFilters (configVariableExpander ));
151+ this .outputs = Map .copyOf (setupOutputs (configVariableExpander ));
152+ } catch (final Exception e ) {
153+ throw new IllegalStateException ("Unable to configure plugins for worker stage: " + e .getMessage (), e );
154+ }
155+ }
156+
157+ public Collection <AbstractOutputDelegatorExt > getOutputs () {
158+ return outputs .values ();
159+ }
160+
161+ public Collection <AbstractFilterDelegatorExt > getFilters () {
162+ return filters .values ();
163+ }
164+
165+ public CompiledExecution buildExecution () {
166+ return buildExecution (false );
167+ }
168+
169+ public CompiledExecution buildExecution (final boolean orderedExecution ) {
170+ return orderedExecution
171+ ? new CompiledPipeline .CompiledOrderedExecution (this )
172+ : new CompiledPipeline .CompiledUnorderedExecution (this );
173+ }
174+ }
175+
131176 public CompiledPipeline (
132177 final PipelineIR pipelineIR ,
133178 final RubyIntegration .PluginFactory pluginFactory )
@@ -160,24 +205,36 @@ public CompiledPipeline(
160205 try (ConfigVariableExpander cve = new ConfigVariableExpander (
161206 secretStore ,
162207 EnvironmentVariableProvider .defaultProvider ())) {
163- inputs = setupInputs (cve );
164- filters = setupFilters (cve );
165- outputs = setupOutputs (cve );
208+ this .inputStage = new InputStage (cve );
209+ this .workerStage = withLock (workerStageWriteLock , () -> new WorkerStage (cve ));
166210 } catch (Exception e ) {
167211 throw new IllegalStateException ("Unable to configure plugins: " + e .getMessage (), e );
168212 }
169213 }
170214
215+ public WorkerStage workerStage () {
216+ return withLock (workerStageReadLock , () -> workerStage );
217+ }
218+
219+ static <T > T withLock (final Lock lock , final Supplier <T > supplier ) {
220+ lock .lock ();
221+ try {
222+ return supplier .get ();
223+ } finally {
224+ lock .unlock ();
225+ }
226+ }
227+
171228 public Collection <AbstractOutputDelegatorExt > outputs () {
172- return Collections .unmodifiableCollection (outputs .values ());
229+ return withLock ( workerStageReadLock , () -> Collections .unmodifiableCollection (this . workerStage . outputs .values () ));
173230 }
174231
175232 public Collection <AbstractFilterDelegatorExt > filters () {
176- return Collections .unmodifiableCollection (filters .values ());
233+ return withLock ( workerStageReadLock , () -> Collections .unmodifiableCollection (this . workerStage . filters .values () ));
177234 }
178235
179236 public Collection <IRubyObject > inputs () {
180- return Collections .unmodifiableCollection (inputs );
237+ return Collections .unmodifiableCollection (this . inputStage . inputs );
181238 }
182239
183240 public int getConfiguredBatchSize () {
@@ -194,6 +251,7 @@ public int getOutputChunkingGrowthThresholdFactor() {
194251 * unordered execution model.
195252 * @return CompiledPipeline.CompiledExecution the compiled pipeline
196253 */
254+ @ Deprecated // use WorkerStage#buildExecution
197255 public CompiledPipeline .CompiledExecution buildExecution () {
198256 return buildExecution (false );
199257 }
@@ -205,10 +263,11 @@ public CompiledPipeline.CompiledExecution buildExecution() {
205263 * @param orderedExecution determines whether to build an execution that enforces order or not
206264 * @return CompiledPipeline.CompiledExecution the compiled pipeline
207265 */
266+ @ Deprecated // use WorkerStage#buildExecution
208267 public CompiledPipeline .CompiledExecution buildExecution (boolean orderedExecution ) {
209268 return orderedExecution
210- ? new CompiledPipeline .CompiledOrderedExecution ()
211- : new CompiledPipeline .CompiledUnorderedExecution ();
269+ ? new CompiledPipeline .CompiledOrderedExecution (this . workerStage )
270+ : new CompiledPipeline .CompiledUnorderedExecution (this . workerStage );
212271 }
213272
214273 /**
@@ -334,24 +393,6 @@ public static Object expandConfigVariableKeepingSecrets(ConfigVariableExpander c
334393 return expandConfigVariable (cve , valueToExpand , true );
335394 }
336395
337- /**
338- * Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}.
339- * @param vertex Vertex to check
340- * @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt}
341- */
342- private boolean isFilter (final Vertex vertex ) {
343- return filters .containsKey (vertex .getId ());
344- }
345-
346- /**
347- * Checks if a certain {@link Vertex} represents an output.
348- * @param vertex Vertex to check
349- * @return True iff {@link Vertex} represents an output
350- */
351- private boolean isOutput (final Vertex vertex ) {
352- return outputs .containsKey (vertex .getId ());
353- }
354-
355396 @ SuppressWarnings ("unchecked" )
356397 private int chunker (int batchInputSize , RubyArray <RubyEvent > filteredBatch , java .util .function .BiConsumer <RubyArray <RubyEvent >, Boolean > consumer ) {
357398 final int totalSize = filteredBatch .size ();
@@ -383,6 +424,10 @@ public final class CompiledOrderedExecution extends CompiledExecution {
383424
384425 @ SuppressWarnings ({"unchecked" }) private final RubyArray <RubyEvent > EMPTY_ARRAY = (RubyArray <RubyEvent >) Create .newEmptyArray (RubyUtil .RUBY .getCurrentContext ());
385426
427+ CompiledOrderedExecution (WorkerStage workerStage ) {
428+ super (workerStage );
429+ }
430+
386431 @ Override
387432 public int compute (final QueueBatch batch , final boolean flush , final boolean shutdown ) {
388433 return compute (batch .events (), flush , shutdown );
@@ -420,6 +465,10 @@ public int compute(final Collection<RubyEvent> batch, final boolean flush, final
420465
421466 public final class CompiledUnorderedExecution extends CompiledExecution {
422467
468+ CompiledUnorderedExecution (WorkerStage workerStage ) {
469+ super (workerStage );
470+ }
471+
423472 @ Override
424473 public int compute (final QueueBatch batch , final boolean flush , final boolean shutdown ) {
425474 return compute (batch .events (), flush , shutdown );
@@ -466,10 +515,13 @@ public abstract class CompiledExecution implements Execution<QueueBatch> {
466515 */
467516 private final Map <String , Dataset > plugins = new HashMap <>(50 );
468517
518+ private final WorkerStage workerStage ;
519+
469520 protected final Dataset compiledFilters ;
470521 protected final Dataset compiledOutputs ;
471522
472- CompiledExecution () {
523+ CompiledExecution (final WorkerStage workerStage ) {
524+ this .workerStage = workerStage ;
473525 compiledFilters = compileFilters ();
474526 compiledOutputs = compileOutputs ();
475527 }
@@ -495,7 +547,7 @@ private Dataset compileFilters() {
495547 */
496548 private Dataset compileOutputs () {
497549 final Collection <Vertex > outputNodes = pipelineIR .getGraph ()
498- .allLeaves ().filter (CompiledPipeline . this ::isOutput )
550+ .allLeaves ().filter (this ::isOutput )
499551 .collect (Collectors .toList ());
500552 if (outputNodes .isEmpty ()) {
501553 return Dataset .IDENTITY ;
@@ -520,7 +572,7 @@ private Dataset filterDataset(final Vertex vertex, final Collection<Dataset> dat
520572 final ComputeStepSyntaxElement <Dataset > prepared =
521573 DatasetCompiler .filterDataset (
522574 flatten (datasets , vertex ),
523- filters .get (vertexId )
575+ workerStage . filters .get (vertexId )
524576 );
525577
526578 plugins .put (vertexId , prepared .instantiate ());
@@ -543,8 +595,8 @@ private Dataset outputDataset(final Vertex vertex, final Collection<Dataset> dat
543595 final ComputeStepSyntaxElement <Dataset > prepared =
544596 DatasetCompiler .outputDataset (
545597 flatten (datasets , vertex ),
546- outputs .get (vertexId ),
547- outputs .size () == 1
598+ workerStage . outputs .get (vertexId ),
599+ workerStage . outputs .size () == 1
548600 );
549601
550602 plugins .put (vertexId , prepared .instantiate ());
@@ -646,5 +698,23 @@ private Collection<Dataset> compileDependencies(
646698 }
647699 ).collect (Collectors .toList ());
648700 }
701+
702+ /**
703+ * Checks if a certain {@link Vertex} represents a {@link AbstractFilterDelegatorExt}.
704+ * @param vertex Vertex to check
705+ * @return True iff {@link Vertex} represents a {@link AbstractFilterDelegatorExt}
706+ */
707+ private boolean isFilter (final Vertex vertex ) {
708+ return this .workerStage .filters .containsKey (vertex .getId ());
709+ }
710+
711+ /**
712+ * Checks if a certain {@link Vertex} represents an output.
713+ * @param vertex Vertex to check
714+ * @return True iff {@link Vertex} represents an output
715+ */
716+ private boolean isOutput (final Vertex vertex ) {
717+ return this .workerStage .outputs .containsKey (vertex .getId ());
718+ }
649719 }
650720}
0 commit comments