Invalid Read of Continuous State Motor
Pull subscriptions
In pull delivery, your subscriber application initiates requests to the Pub/Sub server to retrieve messages.
You can use the pull mode to perform asynchronous or synchronous message processing. The pull mode runs in a client library that can use one of the two service APIs, Pull or StreamingPull. The client library can also be a high-level or a low-level client library depending on your requirements.
Before you begin
Before reading this document, ensure that you are familiar with the following:
-
How Pub/Sub works and the different Pub/Sub terms.
-
The different kinds of subscriptions that Pub/Sub supports and why you might want to use a pull subscription.
Asynchronous Pull
Using asynchronous pulling provides higher throughput in your application, by not requiring your application to block for new messages. Messages can be received in your application using a long running message listener, and acknowledged one message at a time, as shown in the example below. Java, Python, .NET, Go, and Ruby clients use the StreamingPull service API to implement the asynchronous client API efficiently.
Not all client libraries support asynchronously pulling messages. To learn about synchronously pulling messages, see Synchronous Pull.
For more information, see the API Reference documentation in your programming language.
C++
Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.
C#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.
Go
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Java
Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.
Ruby
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.
Processing Custom Attributes
This sample shows how to pull messages asynchronously and retrieve the custom attributes from metadata:
C++
Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.
C#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.
Go
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Java
Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.
Ruby
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.
Listening for Errors
This sample shows how to handle errors that arise when subscribing to messages:
C++
Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.
Go
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Java
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.
Ruby
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Message Flow Control
Your subscriber client might process and acknowledge messages more slowly than Pub/Sub sends them to the client. In this case:
-
It's possible that one client could have a backlog of messages because it doesn't have the capacity to process the volume of incoming messages, but another client on the network does have that capacity. The second client could reduce the subscription's backlog, but it doesn't get the chance to do so because the first client maintains a lease on the messages that it receives. This reduces the overall rate of processing because messages get stuck on the first client.
-
Because the client library repeatedly extends the acknowledgement deadline for backlogged messages, those messages continue to consume memory, CPU, and bandwidth resources. As such, the subscriber client might run out of resources (such as memory). This can adversely impact the throughput and latency of processing messages.
To mitigate the issues above, use the flow control features of the subscriber to control the rate at which the subscriber receives messages. These flow control features are illustrated in the following samples:
C++
Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.
C#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.
Go
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Java
Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.
Ruby
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.
More generally, the need for flow control indicates that messages are being published at a higher rate than they are being consumed. If this is a persistent state, rather than a transient spike in message volume, consider increasing the number of subscriber client instances.
Concurrency Control
Support for concurrency depends on your programming language. For language implementations that support parallel threads, such as Java and Go, the client libraries make a default choice for the number of threads. This choice may not be optimal for your application. For example, if you find that your subscriber application is not keeping up with the incoming message volume but is not CPU-bound, you should increase the thread count. For CPU-intensive message processing operations, reducing the number of threads might be appropriate.
The following samples illustrate how to control concurrency in a subscriber:
Support for concurrency depends on your programming language. Refer to the API Reference documentation for more information.
Exactly once delivery
With exactly once delivery, no redelivery occurs once the message has been successfully acknowledged. This means that the acknowledgement future for a received message must return successfully.
The following sample illustrates how to receive messages from a subscription with exactly once delivery enabled:
Using schema
These samples show how to process messages when subscribing to messages on topics configured with schema. To learn more, see Creating and managing schemas.
C++
Before trying this sample, follow the C++ setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C++ API reference documentation.
Avro ProtoC#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.
Avro ProtoGo
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Avro ProtoJava
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Avro Protocol BufferNode.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
Avro Protocol BufferPHP
Before trying this sample, follow the PHP setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub PHP API reference documentation.
Avro Protocol BufferPython
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.
Avro Protocol BufferRuby
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.
Avro Protocol BufferStreamingPull
The Pub/Sub service has two APIs for retrieving messages:
- Pull
- StreamingPull
Where possible, the Cloud Client libraries use StreamingPull for maximum throughput and lowest latency. Although you might never use the StreamingPull API directly, it is important to understand some crucial properties of StreamingPull and how it differs from the more traditional Pull method.
The Pull method relies on a request/response model:
- The client sends a request to the server for messages.
- If the user is out of throughput quota, the server returns a
RESOURCE_EXHAUSTED
error. - The server replies with zero or more messages and closes the connection.
The StreamingPull service API relies on a persistent bidirectional connection to receive multiple messages as they become available:
- The client sends a request to the server to establish a connection.
- If the user is out of connections quota, the server returns a
RESOURCE_EXHAUSTED
error. - The server continuously sends messages to the connected client.
- If/when the user runs out of throughput quota, the stream is suspended, but the connection is not broken. When there is sufficient throughput quota available again, the connection is resumed.
- The connection is eventually closed either by the client or the server.
You provide a callback to the subscriber and the subscriber asynchronously runs the callback for each message. If a subscriber receives messages with the same ordering key, the client libraries sequentially run the callback. The Pub/Sub service delivers these messages to the same subscriber on a best-effort basis.
StreamingPull has a 100% error rate (this is to be expected)
StreamingPull streams always close with a non-OK status. Note that, unlike in regular RPCs, the status here is simply an indication that the stream has been broken, not that requests are failing. Therefore, while the StreamingPull API may have a seemingly surprising 100% error rate, this is by design.
Diagnosing StreamingPull errors
Because StreamingPull streams always close with an error, it isn't helpful to examine stream termination metrics while diagnosing errors. Rather, focus on the StreamingPull response metric ( subscription/streaming_pull_response_count
). Look for these errors:
-
FAILED_PRECONDITION
errors can occur in these cases:- Pub/Sub attempts to decrypt a message with a disabled Cloud KMS key.
- Subscriptions can be temporarily suspended if there are messages in the subscription backlog that are encrypted with a disabled Cloud KMS key.
-
UNAVAILABLE
errors
StreamingPull: Dealing with large backlogs of small messages
The gRPC StreamingPull stack is optimized for high throughput and therefore buffers messages. This can have some consequences if you are attempting to process large backlogs of small messages (rather than a steady stream of new messages). Under these conditions, you may see messages delivered multiple times and they may not be load balanced effectively across clients.
The buffer between the Pub/Sub service and the client library user space is roughly 10MB. To understand the impact of this buffer on client library behavior, consider this example:
- There is a backlog of 10,000 1KB messages on a subscription.
- Each message takes 1 second to process sequentially, by a single-threaded client instance.
- The first client instance to establish a StreamingPull connection to the service for that subscription will fill its buffer with all 10,000 messages.
- It takes 10,000 seconds (almost 3 hours) to process the buffer.
- In that time, some of the buffered messages exceed their acknowledgement deadline and are re-sent to the same client, resulting in duplicates.
- When multiple client instances are running, the messages stuck in the one client's buffer will not be available to any client instances.
This situation will not occur if you use flow control for StreamingPull: the service never has the entire 10MB of messages at a time and so is able to effectively load balance messages across multiple subscribers.
To address this situation, either use a push subscription or the Pull API, currently available in some of the Cloud Client Libraries (see the Synchronous Pull section) and all API Client libraries. To learn more, see the Client Libraries documentation.
Synchronous Pull
There are cases when the asynchronous Pull is not a perfect fit for your application. For example, the application logic might rely on a polling pattern to retrieve messages or require a precise cap on a number of messages retrieved by the client at any given time. To support such applications, the service supports a synchronous Pull method.
Here is some sample code to pull and acknowledge a fixed number of messages:
C#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.
Go
Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.
Java
Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
PHP
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
Protocol
Request:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{ "returnImmediately": "false", "maxMessages": "1" }
Response:
200 OK
{ "receivedMessages": [{ "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...", "message": { "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==", "messageId": "19917247034" } }] }
Request:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{ "ackIds": [ "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..." ] }
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.
Ruby
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.
Pub/Sub delivers a list of messages. If the list has multiple messages, Pub/Sub orders the messages with the same ordering key.
Synchronous Pull with Lease Management
An individual message's processing may exceed the preconfigured acknowledgement deadline, also known as the lease. A good use case for synchronous pull with lease management is when you are processing hundreds of messages in parallel, and each message takes a long time to process. To avoid redelivery on expired messages, the client libraries provide a way to reset their acknowledgement deadlines (except for the Go client library, which automatically modifies the acknowledgement deadlines for polled messages), as shown by the samples below:
C#
Before trying this sample, follow the C# setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub C# API reference documentation.
Java
Before trying this sample, follow the Java setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Java API reference documentation.
Node.js
Before trying this sample, follow the Node.js setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Node.js API reference documentation.
Python
Before trying this sample, follow the Python setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Python API reference documentation.
Ruby
Before trying this sample, follow the Ruby setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Ruby API reference documentation.
Scaling
You may need to implement a scaling mechanism for your subscriber application to keep up with message volume. How to do this depends on your environment, but it will generally be based on backlog metrics offered through the Google Cloud's operations suite monitoring service. For details on how to do this for Compute Engine, see Scaling based on Cloud Monitoring Metrics.
Go to the Pub/Sub section of the GCP Metrics List page to learn which metrics can be monitored programmatically.
Finally, as with all distributed services, expect to occasionally retry every request.
Dealing with duplicates and forcing retries
When you do not acknowledge a message before its acknowledgement deadline has expired, Pub/Sub resends the message. As a result, Pub/Sub can send duplicate messages. Use Cloud Monitoring to monitor acknowledge operations with the expired
response code to detect this condition. To get this data, select the subscription/expired_ack_deadlines_count
metric.
To reduce the duplication rate, extend the message deadline.
- Client libraries handle deadline extension automatically, but you should note that there are default limits on the maximum extension deadline that can be configured.
- If you are building your own client library, use the
modifyAckDeadline
method to extend the acknowledgement deadline.
Alternately, to force Pub/Sub to retry a message, set modifyAckDeadline
to 0.
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2022-10-12 UTC.
Source: https://cloud.google.com/pubsub/docs/pull
Post a Comment for "Invalid Read of Continuous State Motor"