Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ We recommend using [uv](https://docs.astral.sh/uv/). It's super fast.
```bash
uv python install 3.9.19
uv python pin 3.9.19
uv venv env
uv venv
source env/bin/activate
uv sync --extra dev --extra test
pre-commit install
Expand Down
2 changes: 2 additions & 0 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ posthog/ai/utils.py:0: error: Function "builtins.any" is not valid as a type [v
posthog/ai/utils.py:0: note: Perhaps you meant "typing.Any" instead of "any"?
posthog/ai/utils.py:0: error: Function "builtins.any" is not valid as a type [valid-type]
posthog/ai/utils.py:0: note: Perhaps you meant "typing.Any" instead of "any"?
posthog/client.py:0: error: Name "urlparse" already defined (possibly by an import) [no-redef]
posthog/client.py:0: error: Name "parse_qs" already defined (possibly by an import) [no-redef]
162 changes: 161 additions & 1 deletion posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
to_values,
)
from posthog.utils import (
FlagCache,
RedisFlagCache,
SizeLimitedDict,
clean,
guess_timezone,
Expand Down Expand Up @@ -95,7 +97,30 @@ def add_context_tags(properties):


class Client(object):
"""Create a new PostHog client."""
"""Create a new PostHog client.

Examples:
Basic usage:
>>> client = Client("your-api-key")

With memory-based feature flag fallback cache:
>>> client = Client(
... "your-api-key",
... flag_fallback_cache_url="memory://local/?ttl=300&size=10000"
... )

With Redis fallback cache for high-scale applications:
>>> client = Client(
... "your-api-key",
... flag_fallback_cache_url="redis://localhost:6379/0/?ttl=300"
... )
Comment on lines +109 to +116
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose a slightly different API, similar to how we do configs on posthog-js

# In-memory
client = Client("api-key", flag_fallback_cache=True)

# Redis URL
client = Client("api-key", flag_fallback_cache=FlagFallbackCache(backend="redis", url="redis://localhost:6379/0")

# Existing redis
client = Client("api-key", flag_fallback_cache=FlagFallbackCache(backend="redis", client=redis_client)

This avoids us flooding the main args list and offloads all of the config-handling to a separate class. That class can also handle instantiating the cache, for example. I believe the code would look much more modular that way.

Copy link
Copy Markdown
Contributor

@oliverb123 oliverb123 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why put the service name in the parameters at all, it's right there in the URL schema (I would just pass fallback_cache_url='redis://...', and then list the schema's we support as we extend support to further backend systems)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because someone might pass a custom client we dont recognize (custom redis wrapper) and we need to know how to use that. Im in favor of specifying the service. OR we make it optional, and fail in case we cant use it (or just dont use it, log a warning)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to specifically prevent people passing clients here, though - for reasons outlined here - letting them pass a custom client is the same as defining an interface, except now we'd need to define one for every cache service rather than an abstraction.

I also can't find places we do this custom wrapper stuff in posthog-js?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @oliverb123 for this first implementation. The less surface area we commit to, the better. Down the road, when our caching approach is mature, we could always add an interface for custom cache providers.


With Redis authentication:
>>> client = Client(
... "your-api-key",
... flag_fallback_cache_url="redis://username:password@localhost:6379/0/?ttl=300"
... )
"""
Comment on lines +100 to +123
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this overkill to include in the docstring? makes it seem like this is the only thing worth passing to the Client, but I didn't know where else to put the docs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, for now at least. Documenting the behaviour of these args on posthog.com seems like the generally accepted approach


log = logging.getLogger("posthog")

Expand Down Expand Up @@ -126,6 +151,7 @@ def __init__(
project_root=None,
privacy_mode=False,
before_send=None,
flag_fallback_cache_url=None,
):
self.queue = queue.Queue(max_queue_size)

Expand All @@ -151,6 +177,8 @@ def __init__(
)
self.poller = None
self.distinct_ids_feature_flags_reported = SizeLimitedDict(MAX_DICT_SIZE, set)
self.flag_cache = self._initialize_flag_cache(flag_fallback_cache_url)
self.flag_definition_version = 0
self.disabled = disabled
self.disable_geoip = disable_geoip
self.historical_migration = historical_migration
Expand Down Expand Up @@ -707,6 +735,9 @@ def shutdown(self):

def _load_feature_flags(self):
try:
# Store old flags to detect changes
old_flags_by_key: dict[str, dict] = self.feature_flags_by_key or {}

response = get(
self.personal_api_key,
f"/api/feature_flag/local_evaluation/?token={self.api_key}&send_cohorts",
Expand All @@ -718,6 +749,14 @@ def _load_feature_flags(self):
self.group_type_mapping = response["group_type_mapping"] or {}
self.cohorts = response["cohorts"] or {}

# Check if flag definitions changed and update version
if self.flag_cache and old_flags_by_key != (
self.feature_flags_by_key or {}
):
old_version = self.flag_definition_version
self.flag_definition_version += 1
self.flag_cache.invalidate_version(old_version)
Comment thread
dmarticus marked this conversation as resolved.

except APIError as e:
if e.status == 401:
self.log.error(
Expand All @@ -739,6 +778,10 @@ def _load_feature_flags(self):
self.group_type_mapping = {}
self.cohorts = {}

# Clear flag cache when quota limited
if self.flag_cache:
self.flag_cache.clear()

if self.debug:
raise APIError(
status=402,
Expand Down Expand Up @@ -889,6 +932,12 @@ def _get_feature_flag_result(
flag_result = FeatureFlagResult.from_value_and_payload(
key, lookup_match_value, payload
)

# Cache successful local evaluation
if self.flag_cache and flag_result:
self.flag_cache.set_cached_flag(
distinct_id, key, flag_result, self.flag_definition_version
)
elif not only_evaluate_locally:
try:
flag_details, request_id = self._get_feature_flag_details_from_decide(
Expand All @@ -902,12 +951,30 @@ def _get_feature_flag_result(
flag_result = FeatureFlagResult.from_flag_details(
flag_details, override_match_value
)

# Cache successful remote evaluation
if self.flag_cache and flag_result:
self.flag_cache.set_cached_flag(
distinct_id, key, flag_result, self.flag_definition_version
)

self.log.debug(
f"Successfully computed flag remotely: #{key} -> #{flag_result}"
)
except Exception as e:
self.log.exception(f"[FEATURE FLAGS] Unable to get flag remotely: {e}")

# Fallback to cached value if remote evaluation fails
if self.flag_cache:
stale_result = self.flag_cache.get_stale_cached_flag(
distinct_id, key
)
if stale_result:
self.log.info(
f"[FEATURE FLAGS] Using stale cached value for flag {key}"
)
flag_result = stale_result

if send_feature_flag_events:
self._capture_feature_flag_called(
distinct_id,
Expand Down Expand Up @@ -1278,6 +1345,99 @@ def _get_all_flags_and_payloads_locally(
"featureFlagPayloads": payloads,
}, fallback_to_decide

def _initialize_flag_cache(self, cache_url):
"""Initialize feature flag cache for graceful degradation during service outages.

When enabled, the cache stores flag evaluation results and serves them as fallback
when the PostHog API is unavailable. This ensures your application continues to
receive flag values even during outages.

Args:
cache_url: Cache configuration URL. Examples:
- None: Disable caching
- "memory://local/?ttl=300&size=10000": Memory cache with TTL and size
- "redis://localhost:6379/0/?ttl=300": Redis cache with TTL
- "redis://username:password@host:port/?ttl=300": Redis with auth

Example usage:
# Memory cache
client = Client(
"your-api-key",
flag_fallback_cache_url="memory://local/?ttl=300&size=10000"
)

# Redis cache
client = Client(
"your-api-key",
flag_fallback_cache_url="redis://localhost:6379/0/?ttl=300"
)

# Normal evaluation - cache is populated
flag_value = client.get_feature_flag("my-flag", "user123")

# During API outage - returns cached value instead of None
flag_value = client.get_feature_flag("my-flag", "user123") # Uses cache
"""
if not cache_url:
return None

try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs

try:
parsed = urlparse(cache_url)
scheme = parsed.scheme.lower()
query_params = parse_qs(parsed.query)
ttl = int(query_params.get("ttl", [300])[0])

if scheme == "memory":
size = int(query_params.get("size", [10000])[0])
return FlagCache(size, ttl)

elif scheme == "redis":
try:
# Not worth importing redis if we're not using it
import redis

redis_url = f"{parsed.scheme}://"
if parsed.username or parsed.password:
redis_url += f"{parsed.username or ''}:{parsed.password or ''}@"
redis_url += (
f"{parsed.hostname or 'localhost'}:{parsed.port or 6379}"
)
if parsed.path:
redis_url += parsed.path

client = redis.from_url(redis_url)

# Test connection before using it
client.ping()

return RedisFlagCache(client, default_ttl=ttl)

except ImportError:
self.log.warning(
"[FEATURE FLAGS] Redis not available, flag caching disabled"
)
return None
except Exception as e:
self.log.warning(
f"[FEATURE FLAGS] Redis connection failed: {e}, flag caching disabled"
)
return None
Comment on lines +1420 to +1429
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originally i was falling back to "memory" if they couldn't connect to redis, but that was a bad idea because I don't want to silently create an in-memory cache if they can't initialize a redis connection. Better to just warn and move on.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense I think - do what's asked, or not at all.

else:
raise ValueError(
f"Unknown cache URL scheme: {scheme}. Supported schemes: memory, redis"
)

except Exception as e:
self.log.warning(
f"[FEATURE FLAGS] Failed to parse cache URL '{cache_url}': {e}"
)
return None

def feature_flag_definitions(self):
return self.feature_flags

Expand Down
123 changes: 123 additions & 0 deletions posthog/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import unittest
from dataclasses import dataclass
from datetime import date, datetime, timedelta
Expand All @@ -12,6 +13,7 @@
from pydantic.v1 import BaseModel as BaseModelV1

from posthog import utils
from posthog.types import FeatureFlagResult

TEST_API_KEY = "kOOlRy2QlMY9jHZQv0bKz0FZyazBUoY8Arj0lFVNjs4"
FAKE_TEST_API_KEY = "random_key"
Expand Down Expand Up @@ -173,3 +175,124 @@ class TestDataClass:
"inner_optional": None,
},
}


class TestFlagCache(unittest.TestCase):
def setUp(self):
self.cache = utils.FlagCache(max_size=3, default_ttl=1)
self.flag_result = FeatureFlagResult.from_value_and_payload(
"test-flag", True, None
)

def test_cache_basic_operations(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Test cache miss
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

# Test cache set and hit
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is not None
assert result.get_value()

def test_cache_ttl_expiration(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Set flag in cache
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)

# Should be available immediately
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is not None

# Wait for TTL to expire (1 second + buffer)
time.sleep(1.1)

# Should be expired
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

def test_cache_version_invalidation(self):
distinct_id = "user123"
flag_key = "test-flag"
old_version = 1
new_version = 2

# Set flag with old version
self.cache.set_cached_flag(distinct_id, flag_key, self.flag_result, old_version)

# Should hit with old version
result = self.cache.get_cached_flag(distinct_id, flag_key, old_version)
assert result is not None

# Should miss with new version
result = self.cache.get_cached_flag(distinct_id, flag_key, new_version)
assert result is None

# Invalidate old version
self.cache.invalidate_version(old_version)

# Should miss even with old version after invalidation
result = self.cache.get_cached_flag(distinct_id, flag_key, old_version)
assert result is None

def test_stale_cache_functionality(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Set flag in cache
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)

# Wait for TTL to expire
time.sleep(1.1)

# Should not get fresh cache
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

# Should get stale cache (within 1 hour default)
stale_result = self.cache.get_stale_cached_flag(distinct_id, flag_key)
assert stale_result is not None
assert stale_result.get_value()

def test_lru_eviction(self):
# Cache has max_size=3, so adding 4 users should evict the LRU one
flag_version = 1

# Add 3 users
for i in range(3):
user_id = f"user{i}"
self.cache.set_cached_flag(
user_id, "test-flag", self.flag_result, flag_version
)

# Access user0 to make it recently used
self.cache.get_cached_flag("user0", "test-flag", flag_version)

# Add 4th user, should evict user1 (least recently used)
self.cache.set_cached_flag("user3", "test-flag", self.flag_result, flag_version)

# user0 should still be there (was recently accessed)
result = self.cache.get_cached_flag("user0", "test-flag", flag_version)
assert result is not None

# user2 should still be there (was recently added)
result = self.cache.get_cached_flag("user2", "test-flag", flag_version)
assert result is not None

# user3 should be there (just added)
result = self.cache.get_cached_flag("user3", "test-flag", flag_version)
assert result is not None
Loading
Loading