Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions develop-docs/backend/application-domains/tasks/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ from sentry.taskworker.retry import Retry
retry=Retry(times=3, on=(ConnectionError,)),
processing_deadline_duration=60
)
def do_work(organization_id: int, issue_id: int, **kwargs) -> None:
def do_work(organization_id: int, issue_id: int, **kwargs: Any) -> None:
...
```

Expand All @@ -42,9 +42,12 @@ When defining tasks there are some constraints:
deadline has elapsed, it will be killed by the worker runtime. Tasks that are
killed for execution duration are not automatically retried.

- All tasks must be assigned to a 'namespace'. A namespace is a group of related
- Tasks _must_ be assigned to a 'namespace'. A namespace is a group of related
tasks that are operated together and share a backlog.

- Tasks _should_ take `**kwargs: Any` as their final argument.
Without it, adding parameters is more complex and requires multiple deploys.

- The return value of a task is not stored and ignored by workers.

- The module containing a task _must_ be added to `TASKWORKER_IMPORTS` in
Expand Down Expand Up @@ -96,7 +99,7 @@ is inherited.
namespace=issues_tasks,
retry=Retry(times=3, times_exceeded=LastAction.Deadletter),
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
def deliver_issue_webhook(organization_id: int, group_id: int, **kwargs: Any) -> None:
...
```

Expand All @@ -119,7 +122,7 @@ needs to stagger retries, it can use a delayed retry.
namespace=issues_tasks,
retry=Retry(times=3, on=(IntegrationError, ), delay=30)
)
def fetch_commits(repository_id: int) -> None:
def fetch_commits(repository_id: int, **kwargs: Any) -> None:
...
```

Expand All @@ -142,7 +145,7 @@ saturated by tasks running for unbounded amounts of time.
# Extended from the default 10
processing_deadline_duration=60
)
def fetch_commits(repository_id: int) -> None:
def fetch_commits(repository_id: int, **kwargs: Any) -> None:
...
```

Expand Down Expand Up @@ -172,7 +175,7 @@ skipped if they are stale and their results are no longer relevant.
namespace=issues_tasks,
expires=timedelta("5 minutes"),
)
def deliver_issue_webhook(organization_id: int, group_id: int):
def deliver_issue_webhook(organization_id: int, group_id: int, **kwargs: Any):
...
```

Expand Down Expand Up @@ -206,7 +209,7 @@ execution.
namespace=issues_tasks,
at_most_once=True,
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
def deliver_issue_webhook(organization_id: int, group_id: int, **kwargs: Any) -> None:
...

```
Expand Down Expand Up @@ -289,7 +292,7 @@ With an external namespace you can register and spawn **external tasks**.

```python
@launchpad_tasks.register(name="launchpad.task.name")
def run_process(org_id: int, project_id: int, payload: bytes) -> None:
def run_process(org_id: int, project_id: int, payload: bytes, **kwargs: Any) -> None:
pass


Expand Down Expand Up @@ -360,7 +363,7 @@ issues_tasks = taskregistry.create_namespace(

# register tasks within a namespace
@instrumented_task(name="tasks.do_work", namespace=issues_tasks)
def do_work(**kwargs):
def do_work(**kwargs: Any):
...
```

Expand Down
Loading