Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector cps.endpoint config does not target emulator #290

Open
Axelcouty opened this issue Nov 27, 2023 · 2 comments
Open

Connector cps.endpoint config does not target emulator #290

Axelcouty opened this issue Nov 27, 2023 · 2 comments
Assignees
Labels
type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@Axelcouty
Copy link

Hi,

I'm trying to use this connector locally against the pubsub emulator. google-cloud-cli:454.0.0-emulators

I am able to create a topic and a subscription with java client andhile I could make the connector work with existing topic & subscription for my true, existing gcloud project I'm not able to do so with the emulator.

Both the emulator & kafka-connect are started as part of a docker-compose:

  • kafka-connect
    • version: 6.2.1
    • connector version: 1.2.0
    • notable environment variables
      • PUBSUB_EMULATOR_HOST: pubsub:8085
      • CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB: http(s):pubsub:8085 (I tried several)
  • Emulator
    • version: 454.0.0-emulators
    • stat cmd: gcloud beta emulators pubsub start --host-port 0.0.0.0:8085

I tried with this source connector example:

{
  "cps.subscription": "vmchIIQhNG",
  "metadata.publish": "false",
  "name": "test-connector-5800cc7b-a616-4bec-a7b5-b4d5128dbc3b",
  "kafka.partition.count": "1",
  "cps.endpoint": "pubsub:8085",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "cps.project": "my-project",
  "kafka.topic": "test-363210ef-c078-462b-8c46-bc9213a03e2e",
  "kafka.record.headers": "true",
  "headers.publish": "true",
  "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
}

When I have a look at the connector's state I see the following exception raised:

rg.apache.kafka.connect.errors.ConnectException: Error verifying the subscription vmchIIQhNG for project eixample-dev
	at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:314)
	at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:146)
	at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
	at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
	at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
	at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
	at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
	at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=vmchIIQhNG).
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:757)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:736)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	... 3 more
	Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
		at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
		at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
		at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.verifySubscription(CloudPubSubSourceConnector.java:312)
		at com.google.pubsub.kafka.source.CloudPubSubSourceConnector.start(CloudPubSubSourceConnector.java:146)
		at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184)
		at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209)
		at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348)
		at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331)
		at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140)
		at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117)
		at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
		... 3 more
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=vmchIIQhNG).
	at io.grpc.Status.asRuntimeException(Status.java:539)
	... 17 more

It looks to me I'm not able to understand what are the required configuration to be able to hit the local emulator from kafka connect.


Additional infos :

My use case is simply to be able to write tests locally when working on SMT development.

Docker compose, pubsub part:

  pubsub:
    image: gcr.io/google.com/cloudsdktool/google-cloud-cli:454.0.0-emulators
    command: 
      - bash
      - -c
      - "gcloud beta emulators pubsub start --host-port 0.0.0.0:8085"
    ports:
      - "8085:8085"

Thanks for your help.

@samarthsingal samarthsingal added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Dec 19, 2023
@samarthsingal
Copy link
Contributor

We had not anticipated the use-case of someone using the pubsub-group-kafka-connector against the CPS emulator. To support this, we will need to change the grpcTransportChannelProvider-based channel construction verifySubscription to respect the configured cps.endpoint and possibly make other changes as well. We can consider this a FR.

is this blocking deployment of the connector for you?

@Axelcouty
Copy link
Author

Thanks having a look 👀,

It is not blocking our deployment but it does prevent us to avoid creating GCP resources when testing locally or within our CI pipelines.
We implements custom SMTs for Kafka Connect and test with different Kafka Connectors, we use testcontainers quite a lot and I was trying to integrate it.

Considering subscriptions share the messages from a topic it can be cumbersome to have to create a new subscriptions or even for developers, and provide tooling around it or process to avoid forgotten GCP created resources.

While we could just use isolated environment with the emulator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

3 participants