Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ We recommend using [uv](https://docs.astral.sh/uv/). It's super fast.
```bash
uv python install 3.9.19
uv python pin 3.9.19
uv venv env
uv venv
source env/bin/activate
uv sync --extra dev --extra test
pre-commit install
Expand Down
153 changes: 152 additions & 1 deletion posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
to_values,
)
from posthog.utils import (
FlagCache,
RedisFlagCache,
SizeLimitedDict,
clean,
guess_timezone,
Expand Down Expand Up @@ -95,7 +97,33 @@ def add_context_tags(properties):


class Client(object):
"""Create a new PostHog client."""
"""Create a new PostHog client.

Examples:
Basic usage:
>>> client = Client("your-api-key")

With feature flag fallback cache (memory):
>>> client = Client("your-api-key", enable_flag_fallback_cache=True)

With Redis fallback cache for high-scale applications:
>>> client = Client(
... "your-api-key",
... enable_flag_fallback_cache=True,
... flag_fallback_cache_backend="redis",
... flag_fallback_cache_redis_url="redis://localhost:6379/0"
... )
Comment on lines +109 to +116
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose a slightly different API, similar to how we do configs on posthog-js

# In-memory
client = Client("api-key", flag_fallback_cache=True)

# Redis URL
client = Client("api-key", flag_fallback_cache=FlagFallbackCache(backend="redis", url="redis://localhost:6379/0")

# Existing redis
client = Client("api-key", flag_fallback_cache=FlagFallbackCache(backend="redis", client=redis_client)

This avoids us flooding the main args list and offloads all of the config-handling to a separate class. That class can also handle instantiating the cache, for example. I believe the code would look much more modular that way.

Copy link
Copy Markdown
Contributor

@oliverb123 oliverb123 Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why put the service name in the parameters at all, it's right there in the URL schema (I would just pass fallback_cache_url='redis://...', and then list the schema's we support as we extend support to further backend systems)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because someone might pass a custom client we dont recognize (custom redis wrapper) and we need to know how to use that. Im in favor of specifying the service. OR we make it optional, and fail in case we cant use it (or just dont use it, log a warning)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to specifically prevent people passing clients here, though - for reasons outlined here - letting them pass a custom client is the same as defining an interface, except now we'd need to define one for every cache service rather than an abstraction.

I also can't find places we do this custom wrapper stuff in posthog-js?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @oliverb123 for this first implementation. The less surface area we commit to, the better. Down the road, when our caching approach is mature, we could always add an interface for custom cache providers.


With existing Redis client:
>>> import redis
>>> redis_client = redis.Redis(host='localhost', port=6379, db=0)
>>> client = Client(
... "your-api-key",
... enable_flag_fallback_cache=True,
... flag_fallback_cache_backend="redis",
... flag_fallback_cache_redis_client=redis_client
... )
"""
Comment on lines +100 to +123
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this overkill to include in the docstring? makes it seem like this is the only thing worth passing to the Client, but I didn't know where else to put the docs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, for now at least. Documenting the behaviour of these args on posthog.com seems like the generally accepted approach


log = logging.getLogger("posthog")

Expand Down Expand Up @@ -126,6 +154,12 @@ def __init__(
project_root=None,
privacy_mode=False,
before_send=None,
enable_flag_fallback_cache=False,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This param seems overkill - if the URL isn't None, it's enabled

flag_fallback_cache_size=10000,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell what the unit is here, and don't think a user will be able to easily either? See note below, seems backend-specific so I'd put it in a url param.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good callout (the unit is entries and that's clearly not obvious). Agreed about your note with URL params, makes the abstraction even cleaner. Was trying to get too cute here.

flag_fallback_cache_ttl=300,
flag_fallback_cache_backend="memory",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above - you'll need to accept URLs for this anyway, and URLs are fundamentally a serialisation format to describe an access protocol and a location. I'd put these as query parameters into the URL, where they're relevant, like:

  • memory://local/?ttl=300&size=1000
  • redis://username:password@host:port/?some_param=some_value.

This has the advantage of being universal across languages, so we can expose it in a consistent manner (avoiding interface skew a bit), and letting you take vastly different arguments depending on the backend without an endlessly growing list of parameters or wrapper/enum types exposed

flag_fallback_cache_redis_url=None,
flag_fallback_cache_redis_client=None,
):
self.queue = queue.Queue(max_queue_size)

Expand All @@ -151,6 +185,15 @@ def __init__(
)
self.poller = None
self.distinct_ids_feature_flags_reported = SizeLimitedDict(MAX_DICT_SIZE, set)
self.flag_cache = self._initialize_flag_cache(
enable_flag_fallback_cache,
flag_fallback_cache_backend,
flag_fallback_cache_size,
flag_fallback_cache_ttl,
flag_fallback_cache_redis_url,
flag_fallback_cache_redis_client,
)
self.flag_definition_version = 0
self.disabled = disabled
self.disable_geoip = disable_geoip
self.historical_migration = historical_migration
Expand Down Expand Up @@ -707,6 +750,9 @@ def shutdown(self):

def _load_feature_flags(self):
try:
# Store old flags to detect changes
old_flags_by_key: dict[str, dict] = self.feature_flags_by_key or {}

response = get(
self.personal_api_key,
f"/api/feature_flag/local_evaluation/?token={self.api_key}&send_cohorts",
Expand All @@ -718,6 +764,14 @@ def _load_feature_flags(self):
self.group_type_mapping = response["group_type_mapping"] or {}
self.cohorts = response["cohorts"] or {}

# Check if flag definitions changed and update version
if self.flag_cache and old_flags_by_key != (
self.feature_flags_by_key or {}
):
old_version = self.flag_definition_version
self.flag_definition_version += 1
self.flag_cache.invalidate_version(old_version)
Comment thread
dmarticus marked this conversation as resolved.

except APIError as e:
if e.status == 401:
self.log.error(
Expand All @@ -739,6 +793,10 @@ def _load_feature_flags(self):
self.group_type_mapping = {}
self.cohorts = {}

# Clear flag cache when quota limited
if self.flag_cache:
self.flag_cache.clear()

if self.debug:
raise APIError(
status=402,
Expand Down Expand Up @@ -889,6 +947,12 @@ def _get_feature_flag_result(
flag_result = FeatureFlagResult.from_value_and_payload(
key, lookup_match_value, payload
)

# Cache successful local evaluation
if self.flag_cache and flag_result:
self.flag_cache.set_cached_flag(
distinct_id, key, flag_result, self.flag_definition_version
)
elif not only_evaluate_locally:
try:
flag_details, request_id = self._get_feature_flag_details_from_decide(
Expand All @@ -902,12 +966,30 @@ def _get_feature_flag_result(
flag_result = FeatureFlagResult.from_flag_details(
flag_details, override_match_value
)

# Cache successful remote evaluation
if self.flag_cache and flag_result:
self.flag_cache.set_cached_flag(
distinct_id, key, flag_result, self.flag_definition_version
)

self.log.debug(
f"Successfully computed flag remotely: #{key} -> #{flag_result}"
)
except Exception as e:
self.log.exception(f"[FEATURE FLAGS] Unable to get flag remotely: {e}")

# Fallback to cached value if remote evaluation fails
if self.flag_cache:
stale_result = self.flag_cache.get_stale_cached_flag(
distinct_id, key
)
if stale_result:
self.log.info(
f"[FEATURE FLAGS] Using stale cached value for flag {key}"
)
flag_result = stale_result

if send_feature_flag_events:
self._capture_feature_flag_called(
distinct_id,
Expand Down Expand Up @@ -1278,6 +1360,75 @@ def _get_all_flags_and_payloads_locally(
"featureFlagPayloads": payloads,
}, fallback_to_decide

def _initialize_flag_cache(
self,
enable_flag_fallback_cache,
backend,
cache_size,
cache_ttl,
redis_url,
redis_client,
):
"""Initialize feature flag cache for graceful degradation during service outages.

When enabled, the cache stores flag evaluation results and serves them as fallback
when the PostHog API is unavailable. This ensures your application continues to
receive flag values even during outages.

Example Redis usage:
client = Client(
"your-api-key",
enable_flag_fallback_cache=True,
flag_fallback_cache_backend="redis",
flag_fallback_cache_redis_url="redis://localhost:6379/0"
)

# Normal evaluation - cache is populated
flag_value = client.get_feature_flag("my-flag", "user123")

# During API outage - returns cached value instead of None
flag_value = client.get_feature_flag("my-flag", "user123") # Uses cache
"""
if not enable_flag_fallback_cache:
return None

if backend == "redis":
try:
# Try to import redis
import redis

# Use provided client or create from URL
if redis_client:
client = redis_client
elif redis_url:
client = redis.from_url(redis_url)
else:
raise ValueError(
"Redis backend requires either flag_cache_redis_url or flag_cache_redis_client"
)

# Test connection
client.ping()

return RedisFlagCache(client, default_ttl=cache_ttl)

except ImportError:
self.log.warning(
"[FEATURE FLAGS] Redis not available, falling back to memory cache"
)
return FlagCache(cache_size, cache_ttl)
except Exception as e:
self.log.warning(
f"[FEATURE FLAGS] Redis connection failed: {e}, falling back to memory cache"
)
return FlagCache(cache_size, cache_ttl)

elif backend == "memory":
return FlagCache(cache_size, cache_ttl)

else:
raise ValueError(f"Unknown flag cache backend: {backend}")

def feature_flag_definitions(self):
return self.feature_flags

Expand Down
123 changes: 123 additions & 0 deletions posthog/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import unittest
from dataclasses import dataclass
from datetime import date, datetime, timedelta
Expand All @@ -12,6 +13,7 @@
from pydantic.v1 import BaseModel as BaseModelV1

from posthog import utils
from posthog.types import FeatureFlagResult

TEST_API_KEY = "kOOlRy2QlMY9jHZQv0bKz0FZyazBUoY8Arj0lFVNjs4"
FAKE_TEST_API_KEY = "random_key"
Expand Down Expand Up @@ -173,3 +175,124 @@ class TestDataClass:
"inner_optional": None,
},
}


class TestFlagCache(unittest.TestCase):
def setUp(self):
self.cache = utils.FlagCache(max_size=3, default_ttl=1)
self.flag_result = FeatureFlagResult.from_value_and_payload(
"test-flag", True, None
)

def test_cache_basic_operations(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Test cache miss
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

# Test cache set and hit
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is not None
assert result.get_value()

def test_cache_ttl_expiration(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Set flag in cache
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)

# Should be available immediately
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is not None

# Wait for TTL to expire (1 second + buffer)
time.sleep(1.1)

# Should be expired
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

def test_cache_version_invalidation(self):
distinct_id = "user123"
flag_key = "test-flag"
old_version = 1
new_version = 2

# Set flag with old version
self.cache.set_cached_flag(distinct_id, flag_key, self.flag_result, old_version)

# Should hit with old version
result = self.cache.get_cached_flag(distinct_id, flag_key, old_version)
assert result is not None

# Should miss with new version
result = self.cache.get_cached_flag(distinct_id, flag_key, new_version)
assert result is None

# Invalidate old version
self.cache.invalidate_version(old_version)

# Should miss even with old version after invalidation
result = self.cache.get_cached_flag(distinct_id, flag_key, old_version)
assert result is None

def test_stale_cache_functionality(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Set flag in cache
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)

# Wait for TTL to expire
time.sleep(1.1)

# Should not get fresh cache
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

# Should get stale cache (within 1 hour default)
stale_result = self.cache.get_stale_cached_flag(distinct_id, flag_key)
assert stale_result is not None
assert stale_result.get_value()

def test_lru_eviction(self):
# Cache has max_size=3, so adding 4 users should evict the LRU one
flag_version = 1

# Add 3 users
for i in range(3):
user_id = f"user{i}"
self.cache.set_cached_flag(
user_id, "test-flag", self.flag_result, flag_version
)

# Access user0 to make it recently used
self.cache.get_cached_flag("user0", "test-flag", flag_version)

# Add 4th user, should evict user1 (least recently used)
self.cache.set_cached_flag("user3", "test-flag", self.flag_result, flag_version)

# user0 should still be there (was recently accessed)
result = self.cache.get_cached_flag("user0", "test-flag", flag_version)
assert result is not None

# user2 should still be there (was recently added)
result = self.cache.get_cached_flag("user2", "test-flag", flag_version)
assert result is not None

# user3 should be there (just added)
result = self.cache.get_cached_flag("user3", "test-flag", flag_version)
assert result is not None
Loading
Loading