diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java index 2411f77afe3..bb98c83b114 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java @@ -120,6 +120,13 @@ public IRubyObject configName(final ThreadContext context) { protected abstract IRubyObject getConfigName(ThreadContext context); + @JRubyMethod(name = "ruby_plugin") + public IRubyObject rubyPlugin(final ThreadContext context) { + return getRubyPlugin(context); + } + + protected abstract IRubyObject getRubyPlugin(final ThreadContext context); + @JRubyMethod(name = "id") public IRubyObject getId() { return id; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java index efcd66f6635..7fdd195034e 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/AbstractOutputDelegatorExt.java @@ -137,6 +137,13 @@ protected void initMetrics(final String id, final AbstractMetricExt metric) { } } + @JRubyMethod(name = "ruby_plugin") + public IRubyObject rubyPlugin(final ThreadContext context) { + return getRubyPlugin(context); + } + + protected abstract IRubyObject getRubyPlugin(ThreadContext context); + protected abstract IRubyObject getConfigName(ThreadContext context); protected abstract IRubyObject getConcurrency(ThreadContext context); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java index 0e6ee681c97..3e5bfb2c7c0 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java @@ -84,6 +84,11 @@ public FilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } + @Override + protected IRubyObject getRubyPlugin(final ThreadContext context) { + return filter; + } + @Override protected void doRegister(final ThreadContext context) { filter.callMethod(context, "register"); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java index 989098290ed..2ff9573d75d 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaFilterDelegatorExt.java @@ -87,6 +87,11 @@ protected RubyArray doMultiFilter(final RubyArray batch) { return newBatch; } + @Override + protected IRubyObject getRubyPlugin(final ThreadContext context) { + return context.nil; + } + @Override protected void doRegister(ThreadContext context) { } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java index 31304e31f9a..92b50657ed7 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaOutputDelegatorExt.java @@ -125,6 +125,11 @@ protected void doRegister(final ThreadContext context) { registerAction.run(); } + @Override + protected IRubyObject getRubyPlugin(final ThreadContext context) { + return context.nil; + } + @Override protected IRubyObject reloadable(final ThreadContext context) { return context.tru; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java index bea5d5bab5c..29ae8142433 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputDelegatorExt.java @@ -126,6 +126,11 @@ protected void doRegister(final ThreadContext context) { strategy.register(context); } + @Override + protected IRubyObject getRubyPlugin(final ThreadContext context) { + return strategy.getRubyPlugin(context); + } + @Override protected IRubyObject reloadable(final ThreadContext context) { return outputClass.callMethod(context, "reloadable?"); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java index cbca2c29915..e2475529497 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/OutputStrategyExt.java @@ -20,9 +20,6 @@ package org.logstash.config.ir.compiler; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.stream.Collectors; import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyClass; @@ -34,14 +31,15 @@ import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.internal.runtime.methods.DynamicMethod; -import org.jruby.runtime.Block; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; import org.logstash.execution.ExecutionContextExt; import org.logstash.plugins.factory.ContextualizerExt; -import static org.logstash.RubyUtil.PLUGIN_CONTEXTUALIZER_MODULE; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; public final class OutputStrategyExt { @@ -123,6 +121,8 @@ public abstract static class AbstractOutputStrategyExt extends RubyObject { private RubyClass outputClass; + public abstract IRubyObject getRubyPlugin(final ThreadContext context); + public AbstractOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); } @@ -214,6 +214,11 @@ public IRubyObject workers() { return workers; } + @Override + public IRubyObject getRubyPlugin(final ThreadContext context) { + return workers.isEmpty() ? context.nil : (IRubyObject) workers.get(0); + } + @Override protected IRubyObject output(final ThreadContext context, final IRubyObject events) throws InterruptedException { final IRubyObject worker = workerQueue.take(); @@ -267,6 +272,11 @@ public IRubyObject initialize(final ThreadContext context, final IRubyObject[] a return this; } + @Override + public IRubyObject getRubyPlugin(final ThreadContext context) { + return output; + } + @Override protected final IRubyObject close(final ThreadContext context) { return output.callMethod(context, "do_close"); diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java index 7e803ddb35e..5c378d571ac 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/OutputDelegatorTest.java @@ -33,7 +33,6 @@ import org.junit.Ignore; import org.junit.Test; import org.logstash.Event; -import org.logstash.instrument.metrics.MetricKeys; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; @@ -137,6 +136,13 @@ public void closesOutputPlugin() { assertEquals(1, FakeOutClass.latestInstance.getCloseCallCount()); } + @Test + public void javaOutputDelegatorReturnsNilForRubyPlugin() { + JavaOutputDelegatorExt delegator = JavaOutputDelegatorExt.create( + "test_plugin", "test_id", metric, events -> {}, () -> {}, () -> {}); + assertThat(delegator.rubyPlugin(RUBY.getCurrentContext()).isNil()).isTrue(); + } + @Test public void singleConcurrencyStrategyIsDefault() { OutputDelegatorExt outputDelegator = constructOutputDelegator(); @@ -166,6 +172,11 @@ public void outputStrategyTests() { // test that metrics are properly set on the instance assertEquals(outputDelegator.namespacedMetric(), FakeOutClass.latestInstance.getMetricArgs()); + + // test that rubyPlugin returns the inner plugin instance + IRubyObject rubyPlugin = outputDelegator.rubyPlugin(RUBY.getCurrentContext()); + assertThat(rubyPlugin).isInstanceOf(FakeOutClass.class); + assertThat(rubyPlugin.isNil()).isFalse(); } }