Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions source-sage-intacct/source_sage_intacct/api.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from datetime import UTC, datetime, timedelta
from logging import Logger
from typing import Any, AsyncGenerator, Callable, TypeVar, Union
from typing import Any, AsyncGenerator, Callable, TypeVar

import estuary_cdk.emitted_changes_cache as cache
from estuary_cdk.capture.common import LogCursor, PageCursor

from .models import (
CreationRecord,
DeletionRecord,
IncrementalResource,
SnapshotResource,
parse_backfill_record,
)
from .sage import Sage

T = TypeVar("T", IncrementalResource, DeletionRecord)
T = TypeVar("T", IncrementalResource, CreationRecord, DeletionRecord)


# This function is kind of intense, but what it's doing is repeatedly issuing
Expand Down Expand Up @@ -57,20 +59,21 @@ async def _fetch_records_generic(
this_count = 0
async for rec in fetch_since(object, cursor):
rec = cls.model_validate(rec.model_dump())
if last_record_ts and rec.WHENMODIFIED != last_record_ts:
rec_ts = rec.cursor_value()
if last_record_ts and rec_ts != last_record_ts:
# Got a timestamp change, which means the timestamp of the prior
# record has been fully read.
last_completed_ts = last_record_ts

if horizon and rec.WHENMODIFIED > now - horizon:
if horizon and rec_ts > now - horizon:
# Stop short if the record is too recent. This will trigger the
# "read fewer documents than the page size" condition a little
# further down.
break

this_count += 1
last_record_ts = rec.WHENMODIFIED
if cache.should_yield(object, getattr(rec, id_field), rec.WHENMODIFIED):
last_record_ts = rec_ts
if cache.should_yield(object, getattr(rec, id_field), rec_ts):
yield rec

total_count += this_count
Expand Down Expand Up @@ -102,7 +105,7 @@ async def _fetch_records_generic(
rec = cls.model_validate(rec.model_dump())
last_record_identifier = getattr(rec, id_field)
this_count += 1
if cache.should_yield(object, getattr(rec, id_field), rec.WHENMODIFIED):
if cache.should_yield(object, getattr(rec, id_field), rec.cursor_value()):
yield rec

if this_count < page_size:
Expand Down Expand Up @@ -133,6 +136,27 @@ async def fetch_changes(
yield item


async def fetch_creations(
object: str,
sage: Sage,
horizon: timedelta | None,
page_size: int,
log: Logger,
cursor: LogCursor,
) -> AsyncGenerator[CreationRecord | LogCursor, None]:
async for item in _fetch_records_generic(
object=object,
cls=CreationRecord,
id_field="RECORDNO",
horizon=horizon,
page_size=page_size,
cursor=cursor,
fetch_since=sage.fetch_created_since,
fetch_at=sage.fetch_created_at,
):
yield item


async def fetch_deletions(
object: str,
sage: Sage,
Expand Down Expand Up @@ -164,18 +188,18 @@ async def fetch_page(
log: Logger,
page: PageCursor,
cutoff: LogCursor,
) -> AsyncGenerator[IncrementalResource | PageCursor, None]:
) -> AsyncGenerator[IncrementalResource | CreationRecord | PageCursor, None]:
assert isinstance(page, int | None)
assert isinstance(cutoff, datetime)

last: int | None = None
count = 0
async for rec in sage.fetch_all(object, page):
rec = IncrementalResource.model_validate(rec.model_dump())
last = rec.RECORDNO
doc = parse_backfill_record(rec.model_dump())
last = doc.RECORDNO
count += 1
if rec.WHENMODIFIED < cutoff:
yield rec
if doc.cursor_value() < cutoff:
yield doc

if count < page_size:
return
Expand Down
43 changes: 41 additions & 2 deletions source-sage-intacct/source_sage_intacct/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ def __str__(self) -> str:
return " - ".join(parts) if parts else "unspecified Sage Intacct error"

def is_permission_error(self) -> bool:
return self.errorno == "PL04000005"
if self.errorno == "PL04000005":
return True
# AUDITHISTORY denials arrive without an `errorno`, so fall
# back to matching the literal description Sage emits.
if self.description2 and "do not have permission to view audit history" in self.description2.lower():
return True
return False

error: list[Error] | Error

Expand Down Expand Up @@ -194,10 +200,40 @@ class SnapshotResource(BaseDocument, extra="allow"):
RECORDNO: Optional[int] = Field(default=None, exclude=True)


class IncrementalResource(BaseDocument, extra="allow"):
# Document is the model used to derive the collection's write schema for
# incremental bindings. Only RECORDNO is required because the three runtime
# document shapes diverge on the rest: update docs carry WHENMODIFIED,
# creation docs carry WHENCREATED without WHENMODIFIED, and deletion docs
# carry WHENMODIFIED without WHENCREATED. Marking either timestamp required
# would reject one of those shapes during write-schema validation.
class Document(BaseDocument, extra="allow"):
RECORDNO: int


class IncrementalResource(Document):
WHENMODIFIED: AwareDatetime

def cursor_value(self) -> AwareDatetime:
return self.WHENMODIFIED


# CreationRecord captures records that exist in Sage with a null WHENMODIFIED.
# They cannot be captured by the WHENMODIFIED-keyed incremental query, so a
# parallel sub-task keyed on WHENCREATED picks them up.
class CreationRecord(Document):
WHENCREATED: AwareDatetime

def cursor_value(self) -> AwareDatetime:
return self.WHENCREATED


def parse_backfill_record(raw: dict) -> "IncrementalResource | CreationRecord":
"""Routes to IncrementalResource when WHENMODIFIED is present, else
CreationRecord."""
if raw.get("WHENMODIFIED") is not None:
return IncrementalResource.model_validate(raw)
return CreationRecord.model_validate(raw)
Comment on lines +220 to +235
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreationRecord captures records that exist in Sage with a null WHENMODIFIED

Shouldn't parse_backfill_record also be checking if WHENMOFIED is None?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't necessarily need to. parse_backfill_record is used to parse dumped SageRecords, and when SageRecords are dumped any None values are excluded:

kwargs.setdefault("exclude_none", True)

But it is safer to use raw.get("WHENMODIFIED") instead of "WHENMODIFIED" in raw when determining whether to validate with IncrementalRecord or CreationRecord in case how SageRecord serialization behavior changes later. I've updated this check to cover if WHENMODIFIED is None too.



class DeletionRecord(BaseDocument, extra="forbid"):
RECORDNO: int = Field(alias="OBJECTKEY", serialization_alias="RECORDNO")
Expand All @@ -208,6 +244,9 @@ class DeletionRecord(BaseDocument, extra="forbid"):
default_factory=lambda: DeletionRecord.Meta(op="d")
)

def cursor_value(self) -> AwareDatetime:
return self.WHENMODIFIED

@field_validator("RECORDNO", mode="before")
@classmethod
def parse_object_key(cls, value: str) -> int:
Expand Down
153 changes: 114 additions & 39 deletions source-sage-intacct/source_sage_intacct/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,37 @@

from .api import (
fetch_changes,
fetch_creations,
fetch_deletions,
fetch_page,
snapshot,
)
from .models import (
ConnectorState,
Document,
EndpointConfig,
IncrementalResource,
ResourceConfig,
ResourceState,
SagePermissionError,
SnapshotResource,
)
from .sage import PAGE_SIZE, Sage

# REALTIME_LAG defers the "realtime" modified sub-task by this much so that
# a near-simultaneous create-then-modify can't race the "realtime_creations"
# sub-task into emitting an older creation record after a newer
# modification record has already been written.
REALTIME_LAG = timedelta(minutes=2)
LOOKBACK_LAG = timedelta(hours=2)
LOOKBACK_LAG_DELETIONS = timedelta(hours=2, minutes=30)

REALTIME = "realtime"
LOOKBACK = "lookback"
REALTIME_CREATIONS = "realtime_creations"
LOOKBACK_CREATIONS = "lookback_creations"
REALTIME_DELETIONS = "realtime_deletions"
LOOKBACK_DELETIONS = "lookback_deletions"

# INCREMENTAL_OBJECTS are those that have a WHENMODIFIED timestamp and support
# queries that filter and sort by that.
INCREMENTAL_OBJECTS = {
Expand Down Expand Up @@ -52,7 +68,7 @@


async def incremental_resource(
sage: Sage, obj: str, started_at: datetime
sage: Sage, obj: str, started_at: datetime, can_read_audit_history: bool
) -> common.Resource:
model = await sage.get_model(obj)

Expand All @@ -64,57 +80,107 @@ async def open(
all_bindings,
):
task.sourced_schema(binding_index, model.sourced_schema())
await task.checkpoint(state=ConnectorState())

# Add state to existing captures that were created before the
# realtime_creations / lookback_creations sub-tasks were added.
if (
isinstance(state.inc, dict)
and REALTIME_CREATIONS not in state.inc
and isinstance(existing := state.inc.get(REALTIME), ResourceState.Incremental)
and isinstance(existing.cursor, datetime)
):
current_cursor = existing.cursor
creation_state: dict[str, ResourceState.Incremental] = {
REALTIME_CREATIONS: ResourceState.Incremental(cursor=current_cursor),
LOOKBACK_CREATIONS: ResourceState.Incremental(cursor=current_cursor - LOOKBACK_LAG),
}
state.inc = {**state.inc, **creation_state}

task.log.info(
"Adding creation subtask state to incremental state.",
{"state.inc": state.inc},
)
await task.checkpoint(
ConnectorState(bindingStateV1={binding.stateKey: ResourceState(inc=creation_state)}),
)
else:
await task.checkpoint(state=ConnectorState())

# There are up to 6 separate subtasks for an incremental resource: 2
# for capturing updated documents, 2 for capturing newly-created
# records that haven't otherwise been modified, and 2 for capturing
# deletions. It is known that the Sage Intacct API is eventually
# consistent, so it is likely that the "realtime" subtasks will
# occasionally miss change events, and the "lookback" subtasks will
# follow behind to true-up the collection.
#
# The "realtime" modified subtask has a small REALTIME_LAG horizon.
# This guards against a race where a record is created with null
# WHENMODIFIED and then modified within seconds: without the lag, the
# modified subtask could emit the post-modification document before
# the creations subtask emits the pre-modification document, and the
# older document would overwrite the newer one. The lag gives the
# creations subtask time to poll and emit first.
#
# Capturing deletions is done in a separate subtask than creates &
# updates, since it requires using a different API for polling the
# audit history object. Note that the "horizon" for the deletions
# lookback subtask is 2.5 hours instead of 2 hours - this is to
# mitigate races where a record is created or updated and then
# immediately deleted, which could otherwise cause the deletion
# document to get captured before the create/update document.
# Delaying the deletions lookback by an additional amount of time
# should prevent such an out-of-order scenario.
subtasks = {
REALTIME: functools.partial(
fetch_changes, obj, sage, REALTIME_LAG, PAGE_SIZE
),
LOOKBACK: functools.partial(
fetch_changes, obj, sage, LOOKBACK_LAG, PAGE_SIZE
),
REALTIME_CREATIONS: functools.partial(
fetch_creations, obj, sage, None, PAGE_SIZE
),
LOOKBACK_CREATIONS: functools.partial(
fetch_creations, obj, sage, LOOKBACK_LAG, PAGE_SIZE
),
}
# The deletion subtasks are only wired up when the configured user
# has permission to read AUDITHISTORY. When the permission is
# missing, their state entries are still seeded in initial_state so
# that if the permission is granted later, the next connector
# restart will reattach the deletion subtasks to their preserved
# cursors and pick up deletions that occurred in the meantime.
if can_read_audit_history:
subtasks[REALTIME_DELETIONS] = functools.partial(
fetch_deletions, obj, sage, None, PAGE_SIZE
)
subtasks[LOOKBACK_DELETIONS] = functools.partial(
fetch_deletions, obj, sage, LOOKBACK_LAG_DELETIONS, PAGE_SIZE
)

common.open_binding(
binding,
binding_index,
state,
task,
fetch_changes={
# There are 4 separate subtasks for an incremental resource: 2
# for capturing created & updated documents, and 2 for capturing
# deletions. It is known that the Sage Intacct API is eventually
# consistent, so it is likely that the "realtime" subtasks will
# occasionally miss change events, and the "lookback" subtasks
# will follow behind to true-up the collection.
#
# Capturing deletions is done in a separate subtask than creates
# & updates, since it requires using a different API for polling
# the audit history object. Note that the "horizon" for the
# deletions lookback subtask is 2.5 hours instead of 2 hours -
# this is to mitigate races where a record is created or updated
# and then immediately deleted, which could otherwise cause the
# deletion document to get captured before the create/update
# document. Delaying the deletions lookback by an additional
# amount of time should prevent such an out-of-order scenario.
"realtime": functools.partial(
fetch_changes, obj, sage, None, PAGE_SIZE
),
"lookback": functools.partial(
fetch_changes, obj, sage, timedelta(hours=2), PAGE_SIZE
),
"realtime_deletions": functools.partial(
fetch_deletions, obj, sage, None, PAGE_SIZE
),
"lookback_deletions": functools.partial(
fetch_deletions, obj, sage, timedelta(hours=2.5), PAGE_SIZE
),
},
fetch_changes=subtasks,
fetch_page=functools.partial(fetch_page, obj, sage, PAGE_SIZE),
)

return common.Resource(
name=obj,
key=["/RECORDNO"],
model=IncrementalResource,
model=Document,
open=open,
initial_state=ResourceState(
inc={
"realtime": ResourceState.Incremental(cursor=started_at),
"lookback": ResourceState.Incremental(cursor=started_at),
"realtime_deletions": ResourceState.Incremental(cursor=started_at),
"lookback_deletions": ResourceState.Incremental(cursor=started_at),
REALTIME: ResourceState.Incremental(cursor=started_at),
LOOKBACK: ResourceState.Incremental(cursor=started_at),
REALTIME_CREATIONS: ResourceState.Incremental(cursor=started_at),
LOOKBACK_CREATIONS: ResourceState.Incremental(cursor=started_at),
REALTIME_DELETIONS: ResourceState.Incremental(cursor=started_at),
LOOKBACK_DELETIONS: ResourceState.Incremental(cursor=started_at),
},
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
Expand Down Expand Up @@ -183,9 +249,18 @@ async def _probe(obj: str) -> tuple[str, str | None]:
if denial is not None:
log.warning(f"omitting binding for {obj}: {denial}")

can_read_audit_history = True
try:
await sage.probe_query_permission("AUDITHISTORY")
except SagePermissionError as e:
can_read_audit_history = False
log.warning(
f"omitting deletion sub-tasks since AUDITHISTORY cannot be read: {e}. "
)

started_at = datetime.now(tz=UTC)
inc = [
incremental_resource(sage, obj, started_at)
incremental_resource(sage, obj, started_at, can_read_audit_history)
for obj in INCREMENTAL_OBJECTS
if obj in accessible_objects
]
Expand Down
Loading
Loading