diff --git a/source-sage-intacct/source_sage_intacct/api.py b/source-sage-intacct/source_sage_intacct/api.py
index 1bf4d067b3..e1833172b3 100644
--- a/source-sage-intacct/source_sage_intacct/api.py
+++ b/source-sage-intacct/source_sage_intacct/api.py
@@ -1,18 +1,20 @@
from datetime import UTC, datetime, timedelta
from logging import Logger
-from typing import Any, AsyncGenerator, Callable, TypeVar, Union
+from typing import Any, AsyncGenerator, Callable, TypeVar
import estuary_cdk.emitted_changes_cache as cache
from estuary_cdk.capture.common import LogCursor, PageCursor
from .models import (
+ CreationRecord,
DeletionRecord,
IncrementalResource,
SnapshotResource,
+ parse_backfill_record,
)
from .sage import Sage
-T = TypeVar("T", IncrementalResource, DeletionRecord)
+T = TypeVar("T", IncrementalResource, CreationRecord, DeletionRecord)
# This function is kind of intense, but what it's doing is repeatedly issuing
@@ -57,20 +59,21 @@ async def _fetch_records_generic(
this_count = 0
async for rec in fetch_since(object, cursor):
rec = cls.model_validate(rec.model_dump())
- if last_record_ts and rec.WHENMODIFIED != last_record_ts:
+ rec_ts = rec.cursor_value()
+ if last_record_ts and rec_ts != last_record_ts:
# Got a timestamp change, which means the timestamp of the prior
# record has been fully read.
last_completed_ts = last_record_ts
- if horizon and rec.WHENMODIFIED > now - horizon:
+ if horizon and rec_ts > now - horizon:
# Stop short if the record is too recent. This will trigger the
# "read fewer documents than the page size" condition a little
# further down.
break
this_count += 1
- last_record_ts = rec.WHENMODIFIED
- if cache.should_yield(object, getattr(rec, id_field), rec.WHENMODIFIED):
+ last_record_ts = rec_ts
+ if cache.should_yield(object, getattr(rec, id_field), rec_ts):
yield rec
total_count += this_count
@@ -102,7 +105,7 @@ async def _fetch_records_generic(
rec = cls.model_validate(rec.model_dump())
last_record_identifier = getattr(rec, id_field)
this_count += 1
- if cache.should_yield(object, getattr(rec, id_field), rec.WHENMODIFIED):
+ if cache.should_yield(object, getattr(rec, id_field), rec.cursor_value()):
yield rec
if this_count < page_size:
@@ -133,6 +136,27 @@ async def fetch_changes(
yield item
+async def fetch_creations(
+ object: str,
+ sage: Sage,
+ horizon: timedelta | None,
+ page_size: int,
+ log: Logger,
+ cursor: LogCursor,
+) -> AsyncGenerator[CreationRecord | LogCursor, None]:
+ async for item in _fetch_records_generic(
+ object=object,
+ cls=CreationRecord,
+ id_field="RECORDNO",
+ horizon=horizon,
+ page_size=page_size,
+ cursor=cursor,
+ fetch_since=sage.fetch_created_since,
+ fetch_at=sage.fetch_created_at,
+ ):
+ yield item
+
+
async def fetch_deletions(
object: str,
sage: Sage,
@@ -164,18 +188,18 @@ async def fetch_page(
log: Logger,
page: PageCursor,
cutoff: LogCursor,
-) -> AsyncGenerator[IncrementalResource | PageCursor, None]:
+) -> AsyncGenerator[IncrementalResource | CreationRecord | PageCursor, None]:
assert isinstance(page, int | None)
assert isinstance(cutoff, datetime)
last: int | None = None
count = 0
async for rec in sage.fetch_all(object, page):
- rec = IncrementalResource.model_validate(rec.model_dump())
- last = rec.RECORDNO
+ doc = parse_backfill_record(rec.model_dump())
+ last = doc.RECORDNO
count += 1
- if rec.WHENMODIFIED < cutoff:
- yield rec
+ if doc.cursor_value() < cutoff:
+ yield doc
if count < page_size:
return
diff --git a/source-sage-intacct/source_sage_intacct/models.py b/source-sage-intacct/source_sage_intacct/models.py
index 09e12e2140..b0f71175f5 100644
--- a/source-sage-intacct/source_sage_intacct/models.py
+++ b/source-sage-intacct/source_sage_intacct/models.py
@@ -85,7 +85,13 @@ def __str__(self) -> str:
return " - ".join(parts) if parts else "unspecified Sage Intacct error"
def is_permission_error(self) -> bool:
- return self.errorno == "PL04000005"
+ if self.errorno == "PL04000005":
+ return True
+ # AUDITHISTORY denials arrive without an `errorno`, so fall
+ # back to matching the literal description Sage emits.
+ if self.description2 and "do not have permission to view audit history" in self.description2.lower():
+ return True
+ return False
error: list[Error] | Error
@@ -194,10 +200,40 @@ class SnapshotResource(BaseDocument, extra="allow"):
RECORDNO: Optional[int] = Field(default=None, exclude=True)
-class IncrementalResource(BaseDocument, extra="allow"):
+# Document is the model used to derive the collection's write schema for
+# incremental bindings. Only RECORDNO is required because the three runtime
+# document shapes diverge on the rest: update docs carry WHENMODIFIED,
+# creation docs carry WHENCREATED without WHENMODIFIED, and deletion docs
+# carry WHENMODIFIED without WHENCREATED. Marking either timestamp required
+# would reject one of those shapes during write-schema validation.
+class Document(BaseDocument, extra="allow"):
RECORDNO: int
+
+
+class IncrementalResource(Document):
WHENMODIFIED: AwareDatetime
+ def cursor_value(self) -> AwareDatetime:
+ return self.WHENMODIFIED
+
+
+# CreationRecord captures records that exist in Sage with a null WHENMODIFIED.
+# They cannot be captured by the WHENMODIFIED-keyed incremental query, so a
+# parallel sub-task keyed on WHENCREATED picks them up.
+class CreationRecord(Document):
+ WHENCREATED: AwareDatetime
+
+ def cursor_value(self) -> AwareDatetime:
+ return self.WHENCREATED
+
+
+def parse_backfill_record(raw: dict) -> "IncrementalResource | CreationRecord":
+ """Routes to IncrementalResource when WHENMODIFIED is present, else
+ CreationRecord."""
+ if raw.get("WHENMODIFIED") is not None:
+ return IncrementalResource.model_validate(raw)
+ return CreationRecord.model_validate(raw)
+
class DeletionRecord(BaseDocument, extra="forbid"):
RECORDNO: int = Field(alias="OBJECTKEY", serialization_alias="RECORDNO")
@@ -208,6 +244,9 @@ class DeletionRecord(BaseDocument, extra="forbid"):
default_factory=lambda: DeletionRecord.Meta(op="d")
)
+ def cursor_value(self) -> AwareDatetime:
+ return self.WHENMODIFIED
+
@field_validator("RECORDNO", mode="before")
@classmethod
def parse_object_key(cls, value: str) -> int:
diff --git a/source-sage-intacct/source_sage_intacct/resources.py b/source-sage-intacct/source_sage_intacct/resources.py
index 49a523fed6..b26ab1c3d2 100644
--- a/source-sage-intacct/source_sage_intacct/resources.py
+++ b/source-sage-intacct/source_sage_intacct/resources.py
@@ -9,14 +9,15 @@
from .api import (
fetch_changes,
+ fetch_creations,
fetch_deletions,
fetch_page,
snapshot,
)
from .models import (
ConnectorState,
+ Document,
EndpointConfig,
- IncrementalResource,
ResourceConfig,
ResourceState,
SagePermissionError,
@@ -24,6 +25,21 @@
)
from .sage import PAGE_SIZE, Sage
+# REALTIME_LAG defers the "realtime" modified sub-task by this much so that
+# a near-simultaneous create-then-modify can't race the "realtime_creations"
+# sub-task into emitting an older creation record after a newer
+# modification record has already been written.
+REALTIME_LAG = timedelta(minutes=2)
+LOOKBACK_LAG = timedelta(hours=2)
+LOOKBACK_LAG_DELETIONS = timedelta(hours=2, minutes=30)
+
+REALTIME = "realtime"
+LOOKBACK = "lookback"
+REALTIME_CREATIONS = "realtime_creations"
+LOOKBACK_CREATIONS = "lookback_creations"
+REALTIME_DELETIONS = "realtime_deletions"
+LOOKBACK_DELETIONS = "lookback_deletions"
+
# INCREMENTAL_OBJECTS are those that have a WHENMODIFIED timestamp and support
# queries that filter and sort by that.
INCREMENTAL_OBJECTS = {
@@ -52,7 +68,7 @@
async def incremental_resource(
- sage: Sage, obj: str, started_at: datetime
+ sage: Sage, obj: str, started_at: datetime, can_read_audit_history: bool
) -> common.Resource:
model = await sage.get_model(obj)
@@ -64,57 +80,107 @@ async def open(
all_bindings,
):
task.sourced_schema(binding_index, model.sourced_schema())
- await task.checkpoint(state=ConnectorState())
+
+ # Add state to existing captures that were created before the
+ # realtime_creations / lookback_creations sub-tasks were added.
+ if (
+ isinstance(state.inc, dict)
+ and REALTIME_CREATIONS not in state.inc
+ and isinstance(existing := state.inc.get(REALTIME), ResourceState.Incremental)
+ and isinstance(existing.cursor, datetime)
+ ):
+ current_cursor = existing.cursor
+ creation_state: dict[str, ResourceState.Incremental] = {
+ REALTIME_CREATIONS: ResourceState.Incremental(cursor=current_cursor),
+ LOOKBACK_CREATIONS: ResourceState.Incremental(cursor=current_cursor - LOOKBACK_LAG),
+ }
+ state.inc = {**state.inc, **creation_state}
+
+ task.log.info(
+ "Adding creation subtask state to incremental state.",
+ {"state.inc": state.inc},
+ )
+ await task.checkpoint(
+ ConnectorState(bindingStateV1={binding.stateKey: ResourceState(inc=creation_state)}),
+ )
+ else:
+ await task.checkpoint(state=ConnectorState())
+
+ # There are up to 6 separate subtasks for an incremental resource: 2
+ # for capturing updated documents, 2 for capturing newly-created
+ # records that haven't otherwise been modified, and 2 for capturing
+ # deletions. It is known that the Sage Intacct API is eventually
+ # consistent, so it is likely that the "realtime" subtasks will
+ # occasionally miss change events, and the "lookback" subtasks will
+ # follow behind to true-up the collection.
+ #
+ # The "realtime" modified subtask has a small REALTIME_LAG horizon.
+ # This guards against a race where a record is created with null
+ # WHENMODIFIED and then modified within seconds: without the lag, the
+ # modified subtask could emit the post-modification document before
+ # the creations subtask emits the pre-modification document, and the
+ # older document would overwrite the newer one. The lag gives the
+ # creations subtask time to poll and emit first.
+ #
+ # Capturing deletions is done in a separate subtask than creates &
+ # updates, since it requires using a different API for polling the
+ # audit history object. Note that the "horizon" for the deletions
+ # lookback subtask is 2.5 hours instead of 2 hours - this is to
+ # mitigate races where a record is created or updated and then
+ # immediately deleted, which could otherwise cause the deletion
+ # document to get captured before the create/update document.
+ # Delaying the deletions lookback by an additional amount of time
+ # should prevent such an out-of-order scenario.
+ subtasks = {
+ REALTIME: functools.partial(
+ fetch_changes, obj, sage, REALTIME_LAG, PAGE_SIZE
+ ),
+ LOOKBACK: functools.partial(
+ fetch_changes, obj, sage, LOOKBACK_LAG, PAGE_SIZE
+ ),
+ REALTIME_CREATIONS: functools.partial(
+ fetch_creations, obj, sage, None, PAGE_SIZE
+ ),
+ LOOKBACK_CREATIONS: functools.partial(
+ fetch_creations, obj, sage, LOOKBACK_LAG, PAGE_SIZE
+ ),
+ }
+ # The deletion subtasks are only wired up when the configured user
+ # has permission to read AUDITHISTORY. When the permission is
+ # missing, their state entries are still seeded in initial_state so
+ # that if the permission is granted later, the next connector
+ # restart will reattach the deletion subtasks to their preserved
+ # cursors and pick up deletions that occurred in the meantime.
+ if can_read_audit_history:
+ subtasks[REALTIME_DELETIONS] = functools.partial(
+ fetch_deletions, obj, sage, None, PAGE_SIZE
+ )
+ subtasks[LOOKBACK_DELETIONS] = functools.partial(
+ fetch_deletions, obj, sage, LOOKBACK_LAG_DELETIONS, PAGE_SIZE
+ )
common.open_binding(
binding,
binding_index,
state,
task,
- fetch_changes={
- # There are 4 separate subtasks for an incremental resource: 2
- # for capturing created & updated documents, and 2 for capturing
- # deletions. It is known that the Sage Intacct API is eventually
- # consistent, so it is likely that the "realtime" subtasks will
- # occasionally miss change events, and the "lookback" subtasks
- # will follow behind to true-up the collection.
- #
- # Capturing deletions is done in a separate subtask than creates
- # & updates, since it requires using a different API for polling
- # the audit history object. Note that the "horizon" for the
- # deletions lookback subtask is 2.5 hours instead of 2 hours -
- # this is to mitigate races where a record is created or updated
- # and then immediately deleted, which could otherwise cause the
- # deletion document to get captured before the create/update
- # document. Delaying the deletions lookback by an additional
- # amount of time should prevent such an out-of-order scenario.
- "realtime": functools.partial(
- fetch_changes, obj, sage, None, PAGE_SIZE
- ),
- "lookback": functools.partial(
- fetch_changes, obj, sage, timedelta(hours=2), PAGE_SIZE
- ),
- "realtime_deletions": functools.partial(
- fetch_deletions, obj, sage, None, PAGE_SIZE
- ),
- "lookback_deletions": functools.partial(
- fetch_deletions, obj, sage, timedelta(hours=2.5), PAGE_SIZE
- ),
- },
+ fetch_changes=subtasks,
fetch_page=functools.partial(fetch_page, obj, sage, PAGE_SIZE),
)
return common.Resource(
name=obj,
key=["/RECORDNO"],
- model=IncrementalResource,
+ model=Document,
open=open,
initial_state=ResourceState(
inc={
- "realtime": ResourceState.Incremental(cursor=started_at),
- "lookback": ResourceState.Incremental(cursor=started_at),
- "realtime_deletions": ResourceState.Incremental(cursor=started_at),
- "lookback_deletions": ResourceState.Incremental(cursor=started_at),
+ REALTIME: ResourceState.Incremental(cursor=started_at),
+ LOOKBACK: ResourceState.Incremental(cursor=started_at),
+ REALTIME_CREATIONS: ResourceState.Incremental(cursor=started_at),
+ LOOKBACK_CREATIONS: ResourceState.Incremental(cursor=started_at),
+ REALTIME_DELETIONS: ResourceState.Incremental(cursor=started_at),
+ LOOKBACK_DELETIONS: ResourceState.Incremental(cursor=started_at),
},
backfill=ResourceState.Backfill(next_page=None, cutoff=started_at),
),
@@ -183,9 +249,18 @@ async def _probe(obj: str) -> tuple[str, str | None]:
if denial is not None:
log.warning(f"omitting binding for {obj}: {denial}")
+ can_read_audit_history = True
+ try:
+ await sage.probe_query_permission("AUDITHISTORY")
+ except SagePermissionError as e:
+ can_read_audit_history = False
+ log.warning(
+ f"omitting deletion sub-tasks since AUDITHISTORY cannot be read: {e}. "
+ )
+
started_at = datetime.now(tz=UTC)
inc = [
- incremental_resource(sage, obj, started_at)
+ incremental_resource(sage, obj, started_at, can_read_audit_history)
for obj in INCREMENTAL_OBJECTS
if obj in accessible_objects
]
diff --git a/source-sage-intacct/source_sage_intacct/sage.py b/source-sage-intacct/source_sage_intacct/sage.py
index da8d4a57a1..cb7996c812 100644
--- a/source-sage-intacct/source_sage_intacct/sage.py
+++ b/source-sage-intacct/source_sage_intacct/sage.py
@@ -278,6 +278,33 @@ async def fetch_all(
async for rec in self._req_json(model, data):
yield rec
+ async def fetch_created_since(
+ self, obj: str, since: AwareDatetime
+ ) -> AsyncGenerator[SageRecord, None]:
+ model = await self.get_model(obj)
+ formatted_since = since.astimezone(model.tz_dt.tzinfo).strftime(
+ "%m/%d/%Y %H:%M:%S"
+ )
+ data = get_creations_since_request(
+ self.config, self.session_id, obj, model.field_names, formatted_since
+ )
+ async for rec in self._req_json(model, data):
+ yield rec
+
+ async def fetch_created_at(
+ self,
+ obj: str,
+ at: AwareDatetime,
+ after: int | None,
+ ) -> AsyncGenerator[SageRecord, None]:
+ model = await self.get_model(obj)
+ formatted_at = at.astimezone(model.tz_dt.tzinfo).strftime("%m/%d/%Y %H:%M:%S")
+ data = get_creations_at_request(
+ self.config, self.session_id, obj, model.field_names, formatted_at, after
+ )
+ async for rec in self._req_json(model, data):
+ yield rec
+
async def fetch_deleted(
self, obj: str, since: AwareDatetime
) -> AsyncGenerator[SageRecord, None]:
@@ -688,6 +715,96 @@ def filter() -> str:
return function_with_session_id_xml(cfg, session_id, exec)
+def get_creations_since_request(
+ cfg: EndpointConfig,
+ session_id: str,
+ object: str,
+ fields: list[str],
+ since: str, # ex: 12/08/2024 10:46:26
+) -> str:
+ exec = f"""
+
+
+
+
+
+
+ WHENMODIFIED
+
+
+ WHENCREATED
+ {since}
+
+
+
+
+
+ WHENCREATED
+
+
+
+
+ json
+
+ {PAGE_SIZE}
+
+""".strip()
+
+ return function_with_session_id_xml(cfg, session_id, exec)
+
+
+def get_creations_at_request(
+ cfg: EndpointConfig,
+ session_id: str,
+ object: str,
+ fields: list[str],
+ at: str, # ex: 12/08/2024 10:46:26
+ after: int | None,
+) -> str:
+ def filter() -> str:
+ if after is None:
+ return ""
+ return f"""
+
+ RECORDNO
+ {after}
+ """
+
+ exec = f"""
+
+
+
+
+
+
+ WHENMODIFIED
+
+
+ WHENCREATED
+ {at}
+ {filter()}
+
+
+
+
+ RECORDNO
+
+
+
+
+ json
+
+ {PAGE_SIZE}
+
+""".strip()
+
+ return function_with_session_id_xml(cfg, session_id, exec)
+
+
def get_deletions_since_request(
cfg: EndpointConfig,
session_id: str,
diff --git a/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_at_request_txt_get_creations_at_request_args13__get_creations_at_request.txt b/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_at_request_txt_get_creations_at_request_args13__get_creations_at_request.txt
new file mode 100644
index 0000000000..e63d766e6f
--- /dev/null
+++ b/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_at_request_txt_get_creations_at_request_args13__get_creations_at_request.txt
@@ -0,0 +1,53 @@
+
+
+
+ sender_id
+ sender_password
+ CONTROLID_PLACEHOLDER
+ false
+ 3.0
+ false
+
+
+
+ some_session_id
+
+
+
+
+
+
+
+
+
+ WHENMODIFIED
+
+
+ WHENCREATED
+ 2023-01-02 03:04:56
+
+
+ RECORDNO
+ 1234
+
+
+
+
+
+ RECORDNO
+
+
+
+
+ json
+
+ 2000
+
+
+
+
+
\ No newline at end of file
diff --git a/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_at_request_txt_get_creations_at_request_args14__get_creations_at_request.txt b/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_at_request_txt_get_creations_at_request_args14__get_creations_at_request.txt
new file mode 100644
index 0000000000..cded427dfc
--- /dev/null
+++ b/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_at_request_txt_get_creations_at_request_args14__get_creations_at_request.txt
@@ -0,0 +1,49 @@
+
+
+
+ sender_id
+ sender_password
+ CONTROLID_PLACEHOLDER
+ false
+ 3.0
+ false
+
+
+
+ some_session_id
+
+
+
+
+
+
+
+
+
+ WHENMODIFIED
+
+
+ WHENCREATED
+ 2023-01-02 03:04:56
+
+
+
+
+
+ RECORDNO
+
+
+
+
+ json
+
+ 2000
+
+
+
+
+
\ No newline at end of file
diff --git a/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_since_request_txt_get_creations_since_request_args12__get_creations_since_request.txt b/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_since_request_txt_get_creations_since_request_args12__get_creations_since_request.txt
new file mode 100644
index 0000000000..99d8db9443
--- /dev/null
+++ b/source-sage-intacct/tests/snapshots/xml_requests__xml_requests_get_creations_since_request_txt_get_creations_since_request_args12__get_creations_since_request.txt
@@ -0,0 +1,49 @@
+
+
+
+ sender_id
+ sender_password
+ CONTROLID_PLACEHOLDER
+ false
+ 3.0
+ false
+
+
+
+ some_session_id
+
+
+
+
+
+
+
+
+
+ WHENMODIFIED
+
+
+ WHENCREATED
+ 2023-01-02 03:04:56
+
+
+
+
+
+ WHENCREATED
+
+
+
+
+ json
+
+ 2000
+
+
+
+
+
\ No newline at end of file
diff --git a/source-sage-intacct/tests/test_api.py b/source-sage-intacct/tests/test_api.py
index 132758f70a..82829d2919 100644
--- a/source-sage-intacct/tests/test_api.py
+++ b/source-sage-intacct/tests/test_api.py
@@ -7,8 +7,9 @@
import pytest
from pydantic import AwareDatetime, model_validator
-from source_sage_intacct.api import fetch_changes
+from source_sage_intacct.api import fetch_changes, fetch_creations, fetch_page
from source_sage_intacct.models import (
+ CreationRecord,
IncrementalResource,
)
from source_sage_intacct.sage import PAGE_SIZE, Sage, SageRecord
@@ -31,6 +32,31 @@ def _normalize_values(cls, values: dict[str, Any]) -> dict[str, Any]:
return values
+class MockCreationRecord(SageRecord):
+ RECORDNO: int
+ WHENCREATED: AwareDatetime
+
+ @model_validator(mode="before")
+ @classmethod
+ def _normalize_values(cls, values: dict[str, Any]) -> dict[str, Any]:
+ return values
+
+
+class MockMixedRecord(SageRecord):
+ """Mirrors a real Sage record where WHENMODIFIED may be null. Used to
+ exercise `fetch_page`'s per-record routing between IncrementalResource
+ and CreationRecord."""
+
+ RECORDNO: int
+ WHENMODIFIED: AwareDatetime | None = None
+ WHENCREATED: AwareDatetime
+
+ @model_validator(mode="before")
+ @classmethod
+ def _normalize_values(cls, values: dict[str, Any]) -> dict[str, Any]:
+ return values
+
+
@pytest.fixture
def mock_sage():
sage = AsyncMock(spec=Sage)
@@ -364,3 +390,75 @@ async def test_fetch_changes_single_page_cycle(mock_sage, mock_logger):
IncrementalResource(RECORDNO=3, WHENMODIFIED=datetime(2023, 1, 2, tzinfo=UTC)),
datetime(2023, 1, 2, tzinfo=UTC),
]
+
+
+@pytest.mark.asyncio
+async def test_fetch_creations_basic(mock_sage, mock_logger):
+ cursor = datetime(2023, 1, 1, tzinfo=UTC)
+ records = [
+ MockCreationRecord(RECORDNO=1, WHENCREATED=datetime(2023, 1, 2, tzinfo=UTC)),
+ MockCreationRecord(RECORDNO=2, WHENCREATED=datetime(2023, 1, 3, tzinfo=UTC)),
+ ]
+ mock_sage.fetch_created_since.return_value = async_iter(records)
+
+ results = [
+ item
+ async for item in fetch_creations(
+ "customer", mock_sage, None, PAGE_SIZE, mock_logger, cursor
+ )
+ ]
+
+ assert mock_sage.fetch_created_since.call_count == 1
+ assert mock_sage.fetch_created_since.mock_calls == [call("customer", cursor)]
+ assert mock_sage.fetch_created_at.call_count == 0
+
+ assert results == [
+ CreationRecord(RECORDNO=1, WHENCREATED=datetime(2023, 1, 2, tzinfo=UTC)),
+ CreationRecord(RECORDNO=2, WHENCREATED=datetime(2023, 1, 3, tzinfo=UTC)),
+ datetime(2023, 1, 3, tzinfo=UTC),
+ ]
+
+
+@pytest.mark.asyncio
+async def test_fetch_page_routes_by_whenmodified(mock_sage, mock_logger):
+ # fetch_page must route records with WHENMODIFIED to IncrementalResource
+ # and records without WHENMODIFIED to CreationRecord. The cutoff
+ # comparison uses cursor_value(), which falls back to WHENCREATED when
+ # WHENMODIFIED is absent.
+ cutoff = datetime(2024, 1, 1, tzinfo=UTC)
+ records = [
+ MockMixedRecord(
+ RECORDNO=1,
+ WHENMODIFIED=datetime(2023, 6, 1, tzinfo=UTC),
+ WHENCREATED=datetime(2023, 1, 1, tzinfo=UTC),
+ ),
+ MockMixedRecord(
+ RECORDNO=2,
+ WHENMODIFIED=None,
+ WHENCREATED=datetime(2023, 7, 1, tzinfo=UTC),
+ ),
+ # This record is after the cutoff (via its WHENCREATED) and should
+ # be skipped — leaves the door open for the incremental sub-tasks
+ # to capture it.
+ MockMixedRecord(
+ RECORDNO=3,
+ WHENMODIFIED=None,
+ WHENCREATED=datetime(2024, 6, 1, tzinfo=UTC),
+ ),
+ ]
+ mock_sage.fetch_all.return_value = async_iter(records)
+
+ results = [
+ item
+ async for item in fetch_page(
+ "customer", mock_sage, PAGE_SIZE, mock_logger, None, cutoff
+ )
+ ]
+
+ assert len(results) == 2
+ assert isinstance(results[0], IncrementalResource)
+ assert results[0].RECORDNO == 1
+ assert results[0].WHENMODIFIED == datetime(2023, 6, 1, tzinfo=UTC)
+ assert isinstance(results[1], CreationRecord)
+ assert results[1].RECORDNO == 2
+ assert results[1].WHENCREATED == datetime(2023, 7, 1, tzinfo=UTC)
diff --git a/source-sage-intacct/tests/test_xml_requests.py b/source-sage-intacct/tests/test_xml_requests.py
index dd05f98616..088e616354 100644
--- a/source-sage-intacct/tests/test_xml_requests.py
+++ b/source-sage-intacct/tests/test_xml_requests.py
@@ -78,6 +78,21 @@
get_deletions_at_request,
(cfg, session_id, object_name, ts, None),
),
+ (
+ "get_creations_since_request.txt",
+ get_creations_since_request,
+ (cfg, session_id, object_name, fields, ts),
+ ),
+ (
+ "get_creations_at_request.txt",
+ get_creations_at_request,
+ (cfg, session_id, object_name, fields, ts, 1234),
+ ),
+ (
+ "get_creations_at_request.txt",
+ get_creations_at_request,
+ (cfg, session_id, object_name, fields, ts, None),
+ ),
],
)
def test_xml_requests(snapshot, name, func, args):