Skip to content

Commit

Permalink
fix: update grpc based ReadObject rpcs to remove race condition betwe…
Browse files Browse the repository at this point in the history
…en cancellation and message handling (#2708)

Update GapicUnbufferedReadableByteChannel to manage the grpc stream itself rather than using the stream iterator provided by gax. This allows us to ensure the cancellation is observed and our draining performs before returning from close().

As a side effect of not using the gax stream iterator, we now must handle stream restarts ourselves. GrpcStorageOptions.ReadObjectResumptionStrategy has been removed entirely, while RetryingDependencies and ResultRetryAlgorithm are now plumbed all the way down to the GapicUnbufferedReadableByteChannel.
  • Loading branch information
BenWhitehead authored Sep 20, 2024
1 parent 260e8ea commit 2c7f088
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.Object;
Expand All @@ -43,27 +45,32 @@ public static GapicDownloadSessionBuilder create() {
return INSTANCE;
}

/**
* Any retry capability must be defined within the provided ServerStreamingCallable. The
* ultimately produced channel will not do any retries of its own.
*/
public ReadableByteChannelSessionBuilder byteChannel(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
RetryingDependencies retryingDependencies,
ResultRetryAlgorithm<?> resultRetryAlgorithm,
ResponseContentLifecycleManager responseContentLifecycleManager) {
return new ReadableByteChannelSessionBuilder(read, responseContentLifecycleManager);
return new ReadableByteChannelSessionBuilder(
read, retryingDependencies, resultRetryAlgorithm, responseContentLifecycleManager);
}

public static final class ReadableByteChannelSessionBuilder {

private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
private final RetryingDependencies retryingDependencies;
private final ResultRetryAlgorithm<?> resultRetryAlgorithm;
private final ResponseContentLifecycleManager responseContentLifecycleManager;
private boolean autoGzipDecompression;
private Hasher hasher;

private ReadableByteChannelSessionBuilder(
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
RetryingDependencies retryingDependencies,
ResultRetryAlgorithm<?> resultRetryAlgorithm,
ResponseContentLifecycleManager responseContentLifecycleManager) {
this.read = read;
this.retryingDependencies = retryingDependencies;
this.resultRetryAlgorithm = resultRetryAlgorithm;
this.responseContentLifecycleManager = responseContentLifecycleManager;
this.hasher = Hasher.noop();
this.autoGzipDecompression = false;
Expand Down Expand Up @@ -105,12 +112,24 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() {
if (autoGzipDecompression) {
return new GzipReadableByteChannel(
new GapicUnbufferedReadableByteChannel(
resultFuture, read, object, hasher, responseContentLifecycleManager),
resultFuture,
read,
object,
hasher,
retryingDependencies,
resultRetryAlgorithm,
responseContentLifecycleManager),
ApiFutures.transform(
resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor()));
} else {
return new GapicUnbufferedReadableByteChannel(
resultFuture, read, object, hasher, responseContentLifecycleManager);
resultFuture,
read,
object,
hasher,
retryingDependencies,
resultRetryAlgorithm,
responseContentLifecycleManager);
}
};
}
Expand Down
Loading

0 comments on commit 2c7f088

Please sign in to comment.