Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

77 changes: 43 additions & 34 deletions posthog/hogql/database/schema/test/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
from posthog.hogql.query import execute_hogql_query

from posthog.temporal.data_imports.sources.stripe.constants import (
CHARGE_RESOURCE_NAME as STRIPE_CHARGE_RESOURCE_NAME,
CUSTOMER_RESOURCE_NAME as STRIPE_CUSTOMER_RESOURCE_NAME,
INVOICE_RESOURCE_NAME as STRIPE_INVOICE_RESOURCE_NAME,
SUBSCRIPTION_RESOURCE_NAME as STRIPE_SUBSCRIPTION_RESOURCE_NAME,
)

from products.data_warehouse.backend.models import (
Expand All @@ -25,13 +27,17 @@
from products.data_warehouse.backend.test.utils import create_data_warehouse_table_from_csv
from products.data_warehouse.backend.types import DataWarehouseManagedViewSetKind
from products.revenue_analytics.backend.hogql_queries.test.data.structure import (
STRIPE_CHARGE_COLUMNS,
STRIPE_CUSTOMER_COLUMNS,
STRIPE_INVOICE_COLUMNS,
STRIPE_SUBSCRIPTION_COLUMNS,
)

TEST_BUCKET_BASE = "test_storage_bucket"
INVOICES_TEST_BUCKET = f"{TEST_BUCKET_BASE}-posthog.revenue_analytics.insights_query_runner.stripe_invoices"
CUSTOMERS_TEST_BUCKET = f"{TEST_BUCKET_BASE}-posthog.revenue_analytics.insights_query_runner.stripe_customers"
SUBSCRIPTIONS_TEST_BUCKET = f"{TEST_BUCKET_BASE}-posthog.revenue_analytics.insights_query_runner.stripe_subscriptions"
CHARGES_TEST_BUCKET = f"{TEST_BUCKET_BASE}-posthog.revenue_analytics.insights_query_runner.stripe_charges"
_TEST_DATA_DIR = (
Path(__file__).resolve().parents[5]
/ "products"
Expand All @@ -53,54 +59,57 @@ class RevenueAnalyticsTestBase(ClickhouseTestMixin, BaseTest):
PERSON_ID = "00000000-0000-0000-0000-000000000000"
DISTINCT_ID = "distinct_id"

_TABLE_CONFIGS: dict[str, tuple[str, dict, str, str]] = {
"invoice": ("stripe_invoices", STRIPE_INVOICE_COLUMNS, INVOICES_TEST_BUCKET, STRIPE_INVOICE_RESOURCE_NAME),
"customer": ("stripe_customers", STRIPE_CUSTOMER_COLUMNS, CUSTOMERS_TEST_BUCKET, STRIPE_CUSTOMER_RESOURCE_NAME),
"subscription": (
"stripe_subscriptions",
STRIPE_SUBSCRIPTION_COLUMNS,
SUBSCRIPTIONS_TEST_BUCKET,
STRIPE_SUBSCRIPTION_RESOURCE_NAME,
),
"charge": ("stripe_charges", STRIPE_CHARGE_COLUMNS, CHARGES_TEST_BUCKET, STRIPE_CHARGE_RESOURCE_NAME),
}

def tearDown(self):
if hasattr(self, "invoices_cleanup_filesystem"):
self.invoices_cleanup_filesystem()
if hasattr(self, "customers_cleanup_filesystem"):
self.customers_cleanup_filesystem()
for cleanup in getattr(self, "_source_cleanups", []):
cleanup()
super().tearDown()

def create_sources(self):
invoices_csv_path = _TEST_DATA_DIR / "stripe_invoices.csv"
invoices_table, self.source, credential, _, self.invoices_cleanup_filesystem = (
create_data_warehouse_table_from_csv(
invoices_csv_path,
"stripe_invoice",
STRIPE_INVOICE_COLUMNS,
INVOICES_TEST_BUCKET,
self.team,
)
)
def create_source_table(self, key: str) -> None:
csv_name, columns, bucket, schema_name = self._TABLE_CONFIGS[key]

customers_csv_path = _TEST_DATA_DIR / "stripe_customers.csv"
customers_table, _, _, _, self.customers_cleanup_filesystem = create_data_warehouse_table_from_csv(
customers_csv_path,
"stripe_customer",
STRIPE_CUSTOMER_COLUMNS,
CUSTOMERS_TEST_BUCKET,
table, source, credential, _, cleanup = create_data_warehouse_table_from_csv(
_TEST_DATA_DIR / f"{csv_name}.csv",
f"stripe_{key}",
columns,
bucket,
self.team,
source=self.source,
credential=credential,
source=getattr(self, "source", None),
credential=getattr(self, "_credential", None),
)

_invoices_schema = ExternalDataSchema.objects.create(
team=self.team,
name=STRIPE_INVOICE_RESOURCE_NAME,
source=self.source,
table=invoices_table,
should_sync=True,
last_synced_at="2024-01-01",
)
if not hasattr(self, "source"):
self.source = source
self._credential = credential

_customers_schema = ExternalDataSchema.objects.create(
if not hasattr(self, "_source_cleanups"):
self._source_cleanups = []
self._source_cleanups.append(cleanup)

ExternalDataSchema.objects.create(
team=self.team,
name=STRIPE_CUSTOMER_RESOURCE_NAME,
name=schema_name,
source=self.source,
table=customers_table,
table=table,
should_sync=True,
last_synced_at="2024-01-01",
)

def create_sources(self):
self.create_source_table("invoice")
self.create_source_table("customer")


class RevenueAnalyticsManagedViewsetsTestMixin(RevenueAnalyticsTestBase):
def setUp(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import pytest
from posthog.test.base import BaseTest
from freezegun import freeze_time
from posthog.test.base import BaseTest, _create_person

from parameterized import parameterized

from posthog.schema import CurrencyCode

from posthog.hogql.database.schema.test.base import RevenueAnalyticsTestBase
from posthog.hogql.parser import parse_select
from posthog.hogql.query import execute_hogql_query

from products.data_warehouse.backend.data_load.source_templates import _revenue_view_name, database_operations
from products.data_warehouse.backend.models.join import DataWarehouseJoin

Expand Down Expand Up @@ -92,3 +99,37 @@ def test_recreates_after_soft_delete(self):
assert DataWarehouseJoin.objects.filter(
team=self.team, joining_table_name="stripe_customer", deleted=True
).exists()


class TestCustomerRevenueViewPersonsJoin(RevenueAnalyticsTestBase):
"""Verify that the joins created by database_operations actually resolve
when revenue analytics queries through them."""

def setUp(self):
super().setUp()
self.create_sources()
self.team.base_currency = CurrencyCode.GBP.value
self.team.save()
self.view_name = _revenue_view_name(self.source.prefix or "")

def test_persons_join_resolves_on_customer_view(self):
_create_person(
team_id=self.team.pk,
distinct_ids=["person_cus_1"],
properties={"marker": "found"},
)

database_operations(self.team.pk, self.source.prefix or "")

with freeze_time(self.QUERY_TIMESTAMP):
response = execute_hogql_query(
parse_select(
f"SELECT id, persons.properties.marker FROM {self.view_name}"
f" WHERE persons.properties.marker IS NOT NULL ORDER BY id"
),
self.team,
modifiers=self.MODIFIERS,
)
assert len(response.results) == 1
assert response.results[0][0] == "cus_1"
assert response.results[0][1] == "found"
Loading
Loading