Reading from a subresource
In this section, we'll implement a stream for the survey responses stream. This stream structure is a little different because it depends on the surveys stream.
Start by creating a new base class for substreams:
class SurveyMonkeySubstream(HttpStream, ABC):
def __init__(self, name: str, path: str, primary_key: Union[str, List[str]], parent_stream: Stream, **kwargs: Any) -> None:
self._name = name
self._path = path
self._primary_key = primary_key
self._parent_stream = parent_stream
super().__init__(**kwargs)
url_base = "https://api.surveymonkey.com"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
links = response.json().get("links", {})
if "next" in links:
return {"next_url": links["next"]}
else:
return {}
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
if next_page_token:
return urlparse(next_page_token["next_url"]).query
else:
return {"per_page": _PAGE_SIZE}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield from response.json().get("data", [])
@property
def name(self) -> str:
return self._name
def path(
self,
*,
stream_state: Optional[Mapping[str, Any]] = None,
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
try:
return self._path.format(stream_slice=stream_slice)
except Exception as e:
raise e
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return self._primary_key
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for _slice in self._parent_stream.stream_slices():
for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
yield parent_record
This class is similar to the base class, but it does not support incremental reads, and its stream slices are generated by reading records from a parent stream. This is how we'll ensure we always read all survey responses.
Note that using this approach, the connector will checkpoint after reading responses for each survey.
Don't forget to update the streams
method to also instantiate the surveys responses stream:
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = TokenAuthenticator(token=config["access_token"])
surveys = SurveyMonkeyBaseStream(name="surveys", path="/v3/surveys", primary_key="id", data_field="data", cursor_field="date_modified", authenticator=auth)
survey_responses = SurveyMonkeySubstream(name="survey_responses", path="/v3/surveys/{stream_slice[id]}/responses/", primary_key="id", authenticator=auth, parent_stream=surveys)
return [
surveys,
survey_responses
]
Before moving on, we'll enable request caching on the surveys stream to avoid fetching the records
both for the surveys stream and for the survey responses stream. You can do this by setting the
use_cache
property to true on the SurveyMonkeyBaseStream
class.
@property
def use_cache(self) -> bool:
return True
Now add the stream to the configured catalog:
{
"streams": [
{
"stream": {
"name": "surveys",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "survey_responses",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
and create a new schema file in source_survey_monkey_demo/schemas/survey_responses.json
. You can
use the connector builder to generate the schema, or paste the one provided below:
{
"$schema": "http://json-schema.org/schema#",
"properties": {
"analyze_url": {
"type": ["string", "null"]
},
"collect_stats": {
"properties": {
"status": {
"properties": {
"open": {
"type": ["number", "null"]
}
},
"type": ["object", "null"]
},
"total_count": {
"type": ["number", "null"]
},
"type": {
"properties": {
"weblink": {
"type": ["number", "null"]
}
},
"type": ["object", "null"]
}
},
"type": ["object", "null"]
},
"date_created": {
"type": ["string", "null"]
},
"date_modified": {
"type": ["string", "null"]
},
"href": {
"type": ["string", "null"]
},
"id": {
"type": ["string", "null"]
},
"language": {
"type": ["string", "null"]
},
"nickname": {
"type": ["string", "null"]
},
"preview": {
"type": ["string", "null"]
},
"question_count": {
"type": ["number", "null"]
},
"response_count": {
"type": ["number", "null"]
},
"title": {
"type": ["string", "null"]
}
},
"type": "object"
}
You should now be able to read your survey responses:
poetry run source-survey-monkey-demo read --config secrets/config.json --catalog integration_tests/configured_catalog.json
In the next section we'll update the connector so it reads stream slices concurrently.