Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸš¨πŸ› Source Posthog: Require single project selection #50848

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: af6d50ee-dddf-4126-a8ee-7faee990774f
dockerImageTag: 1.1.21
dockerImageTag: 2.0.0
dockerRepository: airbyte/source-posthog
documentationUrl: https://docs.airbyte.com/integrations/sources/posthog
githubIssueLabel: source-posthog
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"streams": [
{
"stream": {
"name": "events",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["timestamp"],
"source_defined_primary_key": [["id"]],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

from dataclasses import dataclass
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from urllib.parse import parse_qs, urlparse

from airbyte_cdk.sources.declarative.incremental import Cursor
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
Expand Down Expand Up @@ -56,94 +58,37 @@ def request_params(


@dataclass
class EventsCartesianProductStreamSlicer(Cursor, CartesianProductStreamSlicer):
"""Connector requires support of nested state - each project should have own timestamp value, like:
{
"project_id1": {
"timestamp": "2021-02-01T10:21:35.003000Z"
},
"project_idX": {
"timestamp": "2022-11-17:00:00.000000Z"
}
}
we also have to support old-style (before 0.1.8) states, like:
{
"timestamp": "2021-17-01T10:21:35.003000Z"
}

Slicer also produces separate datetime slices for each project
"""

def __post_init__(self, parameters: Mapping[str, Any]):
self._cursor = {}
self._parameters = parameters

def get_stream_state(self) -> Mapping[str, Any]:
return self._cursor or {}

def set_initial_state(self, stream_state: StreamState) -> None:
self._cursor = stream_state

def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
project_id = str(stream_slice.get("project_id", ""))
if project_id and most_recent_record:
current_cursor_value = self._cursor.get(project_id, {}).get("timestamp", "")
new_cursor_value = most_recent_record.get("timestamp", "")

self._cursor[project_id] = {"timestamp": max(current_cursor_value, new_cursor_value)}

def stream_slices(self) -> Iterable[StreamSlice]:
"""Since each project has its own state, then we need to have a separate
datetime slices for each project
"""

slices = []

project_slicer, datetime_slicer = self.stream_slicers

# support of old style state: it contains only a single 'timestamp' field
old_style_state = self._cursor if "timestamp" in self._cursor else {}

for project_slice in project_slicer.stream_slices():
project_id = str(project_slice.get("project_id", ""))

# use old_style_state if state does not contain states for each project
project_state = self._cursor.get(project_id, {}) or old_style_state

# Each project should have own datetime slices depends on its state
datetime_slicer.set_initial_state(project_state)
project_datetime_slices = datetime_slicer.stream_slices()

# fix date ranges: start_time of next slice must be equal to end_time of previous slice
if project_datetime_slices and project_state:
project_datetime_slices[0]["start_time"] = project_state["timestamp"]
for i, datetime_slice in enumerate(project_datetime_slices[1:], start=1):
datetime_slice["start_time"] = project_datetime_slices[i - 1]["end_time"]

# Add project id to each slice
for datetime_slice in project_datetime_slices:
datetime_slice["project_id"] = project_id

slices.extend(project_datetime_slices)

return slices

def should_be_synced(self, record: Record) -> bool:
class EventsHttpRequester(HttpRequester):
def _request_params(
self,
stream_state: Optional[StreamState],
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
extra_params: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the
implementation is irrelevant for posthog
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
"""
return True
if next_page_token is not None:
url = next_page_token["next_page_token"]
parsed_url = urlparse(url)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
"""
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
"""
first_cursor_value = first.get("timestamp")
second_cursor_value = second.get("timestamp")
if first_cursor_value and second_cursor_value:
return first_cursor_value >= second_cursor_value
elif first_cursor_value:
return True
options = dict((k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed_url.query).items())
else:
return False
options = self._get_request_options(
stream_state,
stream_slice,
next_page_token,
self.get_request_params,
self.get_authenticator().get_request_params,
extra_params
)

if isinstance(options, str):
raise ValueError("Request params cannot be a string")

for k, v in options.items():
if isinstance(v, (list, dict)):
raise ValueError(f"Invalid value for `{k}` parameter. The values of request params cannot be an array or object.")

return options
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ definitions:
requester:
type: HttpRequester
url_base: "{{ config['base_url'] or 'https://app.posthog.com'}}/api/"
path: "/projects/{{ config['project_id'] }}/{{ parameters['name'] }}"
http_method: "GET"
authenticator:
type: BearerAuthenticator
Expand Down Expand Up @@ -45,51 +46,42 @@ definitions:
$ref: "#/definitions/requester"

projects_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "projects"
path: "projects"

base_slicing_stream:
$ref: "#/definitions/base_stream"
retriever:
$ref: "#/definitions/retriever"
requester:
$ref: "#/definitions/requester"
path: "/projects/{{ stream_slice.id }}/{{ parameters['name'] }}"
partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- stream: "#/definitions/projects_stream"
parent_key: id
partition_field: id
path: "projects"
$parameters:
name: "projects"
path: "projects"

cohorts_stream:
$ref: "#/definitions/base_slicing_stream"
$ref: "#/definitions/base_stream"
$parameters:
name: "cohorts"
path: "cohorts"

feature_flags_stream:
$ref: "#/definitions/base_slicing_stream"
$ref: "#/definitions/base_stream"
$parameters:
name: "feature_flags"
path: "feature_flags"

persons_stream:
$ref: "#/definitions/base_slicing_stream"
$ref: "#/definitions/base_stream"
$parameters:
name: "persons"
path: "persons"

annotations_stream:
$ref: "#/definitions/base_slicing_stream"
$ref: "#/definitions/base_stream"
$parameters:
name: "annotations"
path: "annotations"

insights_stream:
$ref: "#/definitions/base_slicing_stream"
$ref: "#/definitions/base_stream"
$parameters:
name: "insights"
path: "insights"
Expand All @@ -100,35 +92,26 @@ definitions:
path: "events"
primary_key: "id"
incremental_sync:
type: CustomIncrementalSync
class_name: source_posthog.components.EventsCartesianProductStreamSlicer
type: DatetimeBasedCursor
start_datetime:
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
end_datetime:
datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%S%z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%dT%H:%M:%S+00:00"
cursor_granularity: "PT0.000001S"
step: "P{{ config.get('events_time_step', 30) }}D"
cursor_field: timestamp
stream_slicers:
- type: SubstreamPartitionRouter
parent_stream_configs:
- stream: "#/definitions/projects_stream"
parent_key: "id"
partition_field: "project_id"
- type: DatetimeBasedCursor
start_datetime:
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
end_datetime:
datetime: "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%S%z') }}"
datetime_format: "%Y-%m-%dT%H:%M:%S%z"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
cursor_datetime_formats:
- "%Y-%m-%dT%H:%M:%S.%f%z"
- "%Y-%m-%dT%H:%M:%S+00:00"
cursor_granularity: "PT0.000001S"
step: "P{{ config.get('events_time_step', 30) }}D"
cursor_field: timestamp
start_time_option:
field_name: after
inject_into: request_parameter
end_time_option:
field_name: before
inject_into: request_parameter
start_time_option:
field_name: after
inject_into: request_parameter
end_time_option:
field_name: before
inject_into: request_parameter
schema_loader:
$ref: "#/definitions/schema_loader"
retriever:
Expand All @@ -144,20 +127,25 @@ definitions:
record_selector:
$ref: "#/definitions/selector"
requester:
$ref: "#/definitions/requester"
path: "/projects/{{ stream_slice.project_id }}/{{ parameters['name'] }}/"
type: CustomRequester
class_name: source_posthog.components.EventsHttpRequester
url_base: "{{ config['base_url'] or 'https://app.posthog.com'}}/api/"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
path: "/projects/{{ config['project_id'] }}/{{ parameters['name'] }}/"
paginator:
type: "DefaultPaginator"
page_size_option:
inject_into: "request_parameter"
field_name: "limit"
pagination_strategy:
type: "OffsetIncrement"
type: "CursorPagination"
cursor_value: "{{ response['next'] }}"
page_size: 10000
page_token_option:
type: RequestOption
field_name: "offset"
inject_into: "request_parameter"
type: RequestPath

streams:
- "#/definitions/projects_stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "PostHog Spec",
"type": "object",
"required": ["api_key", "start_date"],
"required": ["api_key", "project_id", "start_date"],
"properties": {
"start_date": {
"title": "Start Date",
Expand All @@ -25,7 +25,13 @@
"default": "https://app.posthog.com",
"title": "Base URL",
"description": "Base PostHog url. Defaults to PostHog Cloud (https://app.posthog.com).",
"examples": ["https://posthog.example.com"]
"examples": ["https://posthog.example.com", "https://eu.posthog.com"]
},
"project_id": {
"type": "integer",
"title": "Project ID",
"description": "You can find the project ID in the URL of your PostHog dashboard. Create a separate source for each project.",
"examples": ["12345"]
},
"events_time_step": {
"type": "integer",
Expand Down
3 changes: 3 additions & 0 deletions docs/integrations/sources/posthog-migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
## Upgrading to 1.0.0

Version 1.0.0 introduces a single change to the `events` stream. It corrects the casting of the `event` field datatype, which was incorrectly labeled as a `json` object. Now, it is accurately attributed only as a `string`, as outlined in the PostHog [documentation](https://posthog.com/docs/api/events). To apply this change, refresh the schema for the 'events' stream and reset your data.

## Upgrading to 2.0.0
Version 2.0.0 introduces the a required parameter `project id` to the source configuration. You can find this in your project settings in posthog, or the in the URL. If your Posthog organisation has more than 1 project, a separate source is required for each project.
15 changes: 9 additions & 6 deletions docs/integrations/sources/posthog.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ This page contains the setup guide and reference information for the PostHog sou
2. In the left navigation bar, click **Sources**. In the top-right corner, click **+new source**.
3. On the Set up the source page, enter the name for the PostHog connector and select **PostHog** from the Source type dropdown.
4. Enter your `apikey`.
5. Enter your `start_date`.
6. Change default `base_url` if self-hosted posthog instances is used
7. Click **Set up source**.
5. Enter your `project_id`.
6. Enter your `start_date`.
7. Change default `base_url` if self-hosted posthog instances is used
8. Click **Set up source**.

### For Airbyte OSS:

1. Navigate to the Airbyte Open Source dashboard.
2. Set the name for your source.
3. Enter your `api_key`.
4. Enter your `start_date`.
5. Change default `base_url` if self-hosted posthog instances is used
6. Click **Set up source**.
4. Enter your `project_id`.
5. Enter your `start_date`.
6. Change default `base_url` if self-hosted posthog instances is used
7. Click **Set up source**.

## Supported streams and sync modes

Expand Down Expand Up @@ -71,6 +73,7 @@ Want to use the PostHog API beyond these limits? Email Posthog at `customers@pos

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------- |
| 2.0.0 | 2024-01-02 | [50848](https://github.com/airbytehq/airbyte/pull/50848) | Require a single `project_id` to be specified in the source |
| 1.1.21 | 2024-12-28 | [50685](https://github.com/airbytehq/airbyte/pull/50685) | Update dependencies |
| 1.1.20 | 2024-12-21 | [50280](https://github.com/airbytehq/airbyte/pull/50280) | Update dependencies |
| 1.1.19 | 2024-12-14 | [49716](https://github.com/airbytehq/airbyte/pull/49716) | Update dependencies |
Expand Down
Loading