Skip to content

Commit df48822

Browse files
fix local development with the Pub/Sub emulator (GoogleCloudPlatform#121)
* fix local development with the Pub/Sub emulator * black formatting fixes * set name to empty string when it cannot be parsed * convert key error to EventConversionException * used payload timestamp if it exists, fix test coverage * remove test that required update to test_function * fix lint error
1 parent cb164fe commit df48822

File tree

3 files changed

+190
-13
lines changed

3 files changed

+190
-13
lines changed

src/functions_framework/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def view_func(path):
126126
function(data, context)
127127
else:
128128
# This is a regular CloudEvent
129-
event_data = request.get_json()
129+
event_data = event_conversion.marshal_background_event_data(request)
130130
if not event_data:
131131
flask.abort(400)
132132
event_object = BackgroundEvent(**event_data)

src/functions_framework/event_conversion.py

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
# limitations under the License.
1414
import re
1515

16-
from typing import Tuple
16+
from datetime import datetime
17+
from typing import Optional, Tuple
1718

1819
from cloudevents.http import CloudEvent
1920

@@ -55,6 +56,12 @@
5556
_PUBSUB_CE_SERVICE = "pubsub.googleapis.com"
5657
_STORAGE_CE_SERVICE = "storage.googleapis.com"
5758

59+
# Raw pubsub types
60+
_PUBSUB_EVENT_TYPE = "google.pubsub.topic.publish"
61+
_PUBSUB_MESSAGE_TYPE = "type.googleapis.com/google.pubsub.v1.PubsubMessage"
62+
63+
_PUBSUB_TOPIC_REQUEST_PATH = re.compile(r"projects\/[^/?]+\/topics\/[^/?]+")
64+
5865
# Maps background event services to their equivalent CloudEvent services.
5966
_SERVICE_BACKGROUND_TO_CE = {
6067
"providers/cloud.firestore/": _FIRESTORE_CE_SERVICE,
@@ -89,8 +96,8 @@
8996

9097

9198
def background_event_to_cloudevent(request) -> CloudEvent:
92-
"""Converts a background event represented by the given HTTP request into a CloudEvent. """
93-
event_data = request.get_json()
99+
"""Converts a background event represented by the given HTTP request into a CloudEvent."""
100+
event_data = marshal_background_event_data(request)
94101
if not event_data:
95102
raise EventConversionException("Failed to parse JSON")
96103

@@ -168,3 +175,56 @@ def _split_resource(context: Context) -> Tuple[str, str, str]:
168175
raise EventConversionException("Resource regex did not match")
169176

170177
return service, match.group(1), match.group(2)
178+
179+
180+
def marshal_background_event_data(request):
181+
"""Marshal the request body of a raw Pub/Sub HTTP request into the schema that is expected of
182+
a background event"""
183+
try:
184+
request_data = request.get_json()
185+
if not _is_raw_pubsub_payload(request_data):
186+
# If this in not a raw Pub/Sub request, return the unaltered request data.
187+
return request_data
188+
return {
189+
"context": {
190+
"eventId": request_data["message"]["messageId"],
191+
"timestamp": request_data["message"].get(
192+
"publishTime", datetime.utcnow().isoformat() + "Z"
193+
),
194+
"eventType": _PUBSUB_EVENT_TYPE,
195+
"resource": {
196+
"service": _PUBSUB_CE_SERVICE,
197+
"type": _PUBSUB_MESSAGE_TYPE,
198+
"name": _parse_pubsub_topic(request.path),
199+
},
200+
},
201+
"data": {
202+
"@type": _PUBSUB_MESSAGE_TYPE,
203+
"data": request_data["message"]["data"],
204+
"attributes": request_data["message"]["attributes"],
205+
},
206+
}
207+
except (AttributeError, KeyError, TypeError):
208+
raise EventConversionException("Failed to convert Pub/Sub payload to event")
209+
210+
211+
def _is_raw_pubsub_payload(request_data) -> bool:
212+
"""Does the given request body match the schema of a unmarshalled Pub/Sub request"""
213+
return (
214+
request_data is not None
215+
and "context" not in request_data
216+
and "subscription" in request_data
217+
and "message" in request_data
218+
and "data" in request_data["message"]
219+
and "messageId" in request_data["message"]
220+
)
221+
222+
223+
def _parse_pubsub_topic(request_path) -> Optional[str]:
224+
match = _PUBSUB_TOPIC_REQUEST_PATH.search(request_path)
225+
if match:
226+
return match.group(0)
227+
else:
228+
# It is possible to configure a Pub/Sub subscription to push directly to this function
229+
# without passing the topic name in the URL path.
230+
return ""

tests/test_convert.py

Lines changed: 126 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,23 +65,64 @@
6565

6666
BACKGROUND_RESOURCE_STRING = "projects/_/buckets/some-bucket/objects/folder/Test.cs"
6767

68+
PUBSUB_CLOUD_EVENT = {
69+
"specversion": "1.0",
70+
"id": "1215011316659232",
71+
"source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
72+
"time": "2020-05-18T12:13:19Z",
73+
"type": "google.cloud.pubsub.topic.v1.messagePublished",
74+
"datacontenttype": "application/json",
75+
"data": {
76+
"message": {
77+
"data": "10",
78+
},
79+
},
80+
}
81+
6882

6983
@pytest.fixture
7084
def pubsub_cloudevent_output():
71-
event = {
72-
"specversion": "1.0",
73-
"id": "1215011316659232",
74-
"source": "//pubsub.googleapis.com/projects/sample-project/topics/gcf-test",
75-
"time": "2020-05-18T12:13:19Z",
76-
"type": "google.cloud.pubsub.topic.v1.messagePublished",
77-
"datacontenttype": "application/json",
85+
return from_json(json.dumps(PUBSUB_CLOUD_EVENT))
86+
87+
88+
@pytest.fixture
89+
def raw_pubsub_request():
90+
return {
91+
"subscription": "projects/sample-project/subscriptions/gcf-test-sub",
92+
"message": {
93+
"data": "eyJmb28iOiJiYXIifQ==",
94+
"messageId": "1215011316659232",
95+
"attributes": {"test": "123"},
96+
},
97+
}
98+
99+
100+
@pytest.fixture
101+
def marshalled_pubsub_request():
102+
return {
78103
"data": {
79-
"message": {
80-
"data": "10",
104+
"@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",
105+
"data": "eyJmb28iOiJiYXIifQ==",
106+
"attributes": {"test": "123"},
107+
},
108+
"context": {
109+
"eventId": "1215011316659232",
110+
"eventType": "google.pubsub.topic.publish",
111+
"resource": {
112+
"name": "projects/sample-project/topics/gcf-test",
113+
"service": "pubsub.googleapis.com",
114+
"type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",
81115
},
116+
"timestamp": "2021-04-17T07:21:18.249Z",
82117
},
83118
}
84119

120+
121+
@pytest.fixture
122+
def raw_pubsub_cloudevent_output(marshalled_pubsub_request):
123+
event = PUBSUB_CLOUD_EVENT.copy()
124+
# the data payload is more complex for the raw pubsub request
125+
event["data"] = {"message": marshalled_pubsub_request["data"]}
85126
return from_json(json.dumps(event))
86127

87128

@@ -212,3 +253,79 @@ def test_split_resource_no_resource_regex_match():
212253
with pytest.raises(EventConversionException) as exc_info:
213254
event_conversion._split_resource(context)
214255
assert "Resource regex did not match" in exc_info.value.args[0]
256+
257+
258+
def test_marshal_background_event_data_without_topic_in_path(
259+
raw_pubsub_request, marshalled_pubsub_request
260+
):
261+
req = flask.Request.from_values(json=raw_pubsub_request, path="/myfunc/")
262+
payload = event_conversion.marshal_background_event_data(req)
263+
264+
# Remove timestamps as they get generates on the fly
265+
del marshalled_pubsub_request["context"]["timestamp"]
266+
del payload["context"]["timestamp"]
267+
268+
# Resource name is set to empty string when it cannot be parsed from the request path
269+
marshalled_pubsub_request["context"]["resource"]["name"] = ""
270+
271+
assert payload == marshalled_pubsub_request
272+
273+
274+
def test_marshal_background_event_data_with_topic_path(
275+
raw_pubsub_request, marshalled_pubsub_request
276+
):
277+
req = flask.Request.from_values(
278+
json=raw_pubsub_request,
279+
path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true",
280+
)
281+
payload = event_conversion.marshal_background_event_data(req)
282+
283+
# Remove timestamps as they are generated on the fly.
284+
del marshalled_pubsub_request["context"]["timestamp"]
285+
del payload["context"]["timestamp"]
286+
287+
assert payload == marshalled_pubsub_request
288+
289+
290+
def test_pubsub_emulator_request_to_cloudevent(
291+
raw_pubsub_request, raw_pubsub_cloudevent_output
292+
):
293+
req = flask.Request.from_values(
294+
json=raw_pubsub_request,
295+
path="x/projects/sample-project/topics/gcf-test?pubsub_trigger=true",
296+
)
297+
cloudevent = event_conversion.background_event_to_cloudevent(req)
298+
299+
# Remove timestamps as they are generated on the fly.
300+
del raw_pubsub_cloudevent_output["time"]
301+
del cloudevent["time"]
302+
303+
assert cloudevent == raw_pubsub_cloudevent_output
304+
305+
306+
def test_pubsub_emulator_request_to_cloudevent_without_topic_path(
307+
raw_pubsub_request, raw_pubsub_cloudevent_output
308+
):
309+
req = flask.Request.from_values(json=raw_pubsub_request, path="/")
310+
cloudevent = event_conversion.background_event_to_cloudevent(req)
311+
312+
# Remove timestamps as they are generated on the fly.
313+
del raw_pubsub_cloudevent_output["time"]
314+
del cloudevent["time"]
315+
316+
# Default to the service name, when the topic is not configured subscription's pushEndpoint.
317+
raw_pubsub_cloudevent_output["source"] = "//pubsub.googleapis.com/"
318+
319+
assert cloudevent == raw_pubsub_cloudevent_output
320+
321+
322+
def test_pubsub_emulator_request_with_invalid_message(
323+
raw_pubsub_request, raw_pubsub_cloudevent_output
324+
):
325+
# Create an invalid message payload
326+
raw_pubsub_request["message"] = None
327+
req = flask.Request.from_values(json=raw_pubsub_request, path="/")
328+
329+
with pytest.raises(EventConversionException) as exc_info:
330+
cloudevent = event_conversion.background_event_to_cloudevent(req)
331+
assert "Failed to convert Pub/Sub payload to event" in exc_info.value.args[0]

0 commit comments

Comments
 (0)