The Salesforce connector enables streaming of Salesforce platform events and Change Data Capture (CDC) events by using the Faye client or Bauyex protocol. This connector also supports discovery of custom objects and properties.
- Ensure streaming API is enabled for your Salesforce edition and organization.
- Ensure you have the required permissions set up in Salesforce to use Change Data Capture objects.
- Ensure you have the required permissions set up in Salesforce to access the specified objects and events.
- Set the Session Security Level at login value to
None
instead ofHigh Assurance
. - To connect to Salesforce sandboxes or subdomains and use Salesforce as a source system to trigger events, enable the Salesforce Organization object in your Salesforce environment.
- If using Change Data Capture (CDC) events, ensure that CDC is enabled for the specified objects in Salesforce.
The connectivitypack.source
and connectivitypack.source.url
configurations in the KafkaConnector
custom resource provide the connector with the required information to connect to the data source.
Name | Value or Description |
---|---|
connectivitypack.source |
salesforce |
connectivitypack.source.url |
Specifies the URL of the source system. For example, for Salesforce, the base URL of your instance is https://<yourinstance>.salesforce.com . |
You can configure the following authentication mechanisms for Salesforce in the KafkaConnector
custom resource depending on the authentication flow in Salesforce.
- Use Case: Recommended for most applications.
- Required Credentials:
- Client Identity: Obtain this by creating a Connected App in Salesforce and locating the Consumer Key under the application's settings.
- Client Secret: Available in the Connected App configuration alongside the Consumer Key.
- Access Token and Refresh Token: Generated by performing an OAuth flow with the configured Connected App.
For more information, see the Salesforce OAuth 2.0 Documentation.
Name | Description |
---|---|
connectivitypack.source.credentials.authType | BASIC_OAUTH - Specifies that the connector will use Basic OAuth for authentication. |
connectivitypack.source.credentials.clientSecret | The client secret of the Salesforce connected app used for Basic OAuth authentication. |
connectivitypack.source.credentials.clientIdentity | The client ID (or consumer key) of the Salesforce connected app used for Basic OAuth authentication. |
connectivitypack.source.credentials.accessTokenBasicOauth | The access token used for Basic OAuth authentication with Salesforce. |
connectivitypack.source.credentials.refreshTokenBasicOauth | The refresh token used to renew the OAuth access token for Basic OAuth authentication. |
- Use Case: Legacy applications where Basic OAuth is not applicable.
- Required Credentials:
- Username and Password: Use the Salesforce account’s credentials.
- Client Identity and Client Secret: Same as Basic OAuth, obtained from the Connected App settings.
- Important Note: Salesforce has deprecated the OAuth2 Password grant type. If you're using this method, plan to migrate to Basic OAuth to ensure future compatibility.
Name | Description |
---|---|
connectivitypack.source.credentials.authType | OAUTH2_PASSWORD - Specifies that the connector will use OAuth 2.0 Password authentication. |
connectivitypack.source.credentials.username | The Salesforce username required for OAuth2 Password authentication. |
connectivitypack.source.credentials.password | The Salesforce password associated with the username for OAuth2 Password authentication. |
connectivitypack.source.credentials.clientSecret | The client secret of the Salesforce Connected App required for OAuth2 Password authentication. |
connectivitypack.source.credentials.clientIdentity | The client ID (or consumer key) of the Salesforce Connected App required for OAuth2 Password authentication. |
You can specify any of the following objects and associated events in the connectivitypack.source.<object>
and the connectivitypack.source.<object>.events
sections of the KafkaConnector
custom resource:
Salesforce platform events deliver custom event notifications when something meaningful happens to objects that are defined in your Salesforce organization. Platform events are dynamic in nature and specific to the endpoint account connected, and as a result are not shown in the static list.
Objects | Events |
---|---|
Platform Event objects | CREATED |
Salesforce provides queues for recording platform events and each event notification has a unique replay ID. Salesforce retains platform events for 72 hours, and a user can store a replay ID value to use when subscribing again to retrieve events during the retention window, as described in the Salesforce documentation.
The Salesforce connector uses the replay ID to track Salesforce platform events it has received. If the connector is restarted for any reason, it resumes streaming from where it stopped by using the replay ID. If the replay ID is no longer valid (more than 72 hours old), the connector will not be able to resume. Instead, it will start a new subscription to receive events from the current time.
Salesforce CDC events provide notifications of state changes to objects that you are interested in.
Note: CDC must be enabled by customers, and it is only available for objects in the dynamic list.
All custom objects and a subset of standard objects are supported for use with Change Data Capture in Salesforce. For the full list, see Change Event Object Support.
Objects | Events |
---|---|
Change Data Capture objects | CREATED, UPDATED, DELETED |
The following is an example of a connector configuration for Salesforce:
apiVersion: eventstreams.ibm.com/v1beta2
kind: KafkaConnector
metadata:
labels:
# The eventstreams.ibm.com/cluster label identifies the Kafka Connect instance
# in which to create this connector. That KafkaConnect instance
# must have the eventstreams.ibm.com/use-connector-resources annotation
# set to true.
eventstreams.ibm.com/cluster: cp-connect-cluster
name: <name>
namespace: <namespace>
spec:
# Connector class name
class: com.ibm.eventstreams.connect.connectivitypacksource.ConnectivityPackSourceConnector
config:
# Which data source to connect to, for example, salesforce
connectivitypack.source: salesforce
# URL to access the data source, for example, `https://<your-instance-name>.salesforce.com`
connectivitypack.source.url: <URL-of-the-data-source-instance>
# Credentials to access the data source using OAUTH2_PASSWORD authentication.
connectivitypack.source.credentials.authType: OAUTH2_PASSWORD
connectivitypack.source.credentials.username: <username>
connectivitypack.source.credentials.password: <password>
connectivitypack.source.credentials.clientSecret: <client-secret>
connectivitypack.source.credentials.clientIdentity: <client-identity>
# Objects and event types to read from the data source
connectivitypack.source.objects: '<object1>,<object2>'
connectivitypack.source.<object1>.events: 'CREATED'
connectivitypack.source.<object2>.events: 'CREATED,UPDATED'
# Optional, sets the format for Kafka topic names created by the connector.
# You can use placeholders such as '${object}' and '${eventType}', which the connector will replace automatically.
# Including '${object}' or '${eventType}' in the format is optional. For example, '${object}-topic-name' is a valid format.
# By default, the format is '${object}-${eventType}', but it's shown here for clarity.
connectivitypack.topic.name.format: '${object}-${eventType}'
# `tasksMax` must be equal to the number of object-eventType combinations
# In this example it is 3 (object1 - CREATED, object2 - CREATED, object2 - UPDATED)
tasksMax: 3
# Specifies the converter class used to deserialize the message value.
# Change this to a different converter (for example, AvroConverter) as applicable.
value.converter: org.apache.kafka.connect.json.JsonConverter
# Controls whether the schema is included in the message.
# Set this to false to disable schema support, or to true to enable schema inclusion (for example, for Avro).
value.converter.schemas.enable: false