In this quickstart, you'll create two microservices, one with an input binding and another with an output binding. You'll bind to Kafka, but note that there are a myriad of components that Dapr can bind to (see Dapr components).
This quickstart includes two microservices:
- Node.js microservice that utilizes an input binding
- Python microservice that utilizes an output binding
The bindings connect to Kafka, allowing us to push messages into a Kafka instance (from the Python microservice) and receive message from that instance (from the Node microservice) without having to know where the instance is hosted. Instead, connect through the sidecars using the Dapr API. See architecture diagram to see how the components interconnect locally:
Dapr allows us to deploy the same microservices from the local machines to Kubernetes. Correspondingly, this quickstart has instructions for deploying this project locally or in Kubernetes.
Clone this quickstarts repository to your local machine:
git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git
Note: See https://github.com/dapr/quickstarts#supported-dapr-runtime-version for supported tags. Use
git clone https://github.com/dapr/quickstarts.git
when using the edge version of dapr runtime.
In order to run the Kafka bindings quickstart locally, you will run the Kafka broker server in a docker container on your machine.
- To run the container locally, run:
docker-compose -f ./docker-compose-single-kafka.yml up -d
- To see the container running locally, run:
docker ps
The output should be similar to this:
342d3522ca14 kafka-docker_kafka "start-kafka.sh" 14 hours ago Up About
a minute 0.0.0.0:9092->9092/tcp kafka-docker_kafka_1
0cd69dbe5e65 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 8 days ago Up About
a minute 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka-docker_zookeeper_1
Now that you have Kafka running locally on your machine, you'll need to run the microservices. You'll start by running the Node microservice that uses input bindings:
- Navigate to Node subscriber directory in your CLI:
cd nodeapp
- Install dependencies:
npm install
- Run Node quickstart app with Dapr:
dapr run --app-id bindings-nodeapp --app-port 3000 node app.js --resources-path ../components
Next, run the Python microservice that uses output bindings
- Open a new CLI window and navigate to Python subscriber directory in your CLI:
cd pythonapp
- Install dependencies:
pip3 install requests
- Run Python quickstart app with Dapr:
dapr run --app-id bindings-pythonapp python3 app.py --resources-path ../components
- Observe the Python logs, which show a successful output binding with Kafka:
[0m?[94;1m== APP == {'data': {'orderId': 1}}
[0m?[94;1m== APP == Response for order 1: 204
[0m?[94;1m== APP == {'data': {'orderId': 2}}
[0m?[94;1m== APP == Response for order 2: 204
[0m?[94;1m== APP == {'data': {'orderId': 3}}
[0m?[94;1m== APP == Response for order 3: 204
- Observe the Node logs, which show a successful input binding with Kafka:
[0m?[94;1m== APP == { orderId: 1 }
[0m?[94;1m== APP == Hello from Kafka!
[0m?[94;1m== APP == { orderId: 2 }
[0m?[94;1m== APP == Hello from Kafka!
[0m?[94;1m== APP == { orderId: 3 }
[0m?[94;1m== APP == Hello from Kafka!
To cleanly stop the dapr microservices, run:
dapr stop --app-id bindings-nodeapp
dapr stop --app-id bindings-pythonapp
Once you're done, you can spin down your local Kafka Docker Container by running:
docker-compose -f ./docker-compose-single-kafka.yml down
- Install Kafka via bitnami/kafka
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
kubectl create ns kafka
helm install dapr-kafka bitnami/kafka --wait --namespace kafka -f ./kafka-non-persistence.yaml
- Wait until kafka pods are running
kubectl -n kafka get pods -w
NAME READY STATUS RESTARTS AGE
dapr-kafka-0 1/1 Running 0 2m7s
dapr-kafka-zookeeper-0 1/1 Running 0 2m57s
Now that the Kafka binding is set up, deploy the assets.
- In your CLI window, in the bindings directory run:
kubectl apply -f ./deploy
This will deploy bindings-nodeapp and bindings-pythonapp microservices. It will also apply the Kafka bindings component configuration you set up in the last step.
Kubernetes deployments are asyncronous. This means you'll need to wait for the deployment to complete before moving on to the next steps. You can do so with the following command:
kubectl rollout status deploy/bindings-nodeapp
kubectl rollout status deploy/bindings-pythonapp
- Run
kubectl get pods
to see that pods were correctly provisioned.
- Observe the Python app logs, which show a successful output binding with Kafka:
kubectl get pods
The output should look like this:
NAME READY STATUS RESTARTS AGE
bindings-nodeapp-699489b8b6-mqhrj 2/2 Running 0 4s
bindings-pythonapp-644489969b-c8lg5 2/2 Running 0 4m9s
Look at the Python app logs by running:
kubectl logs --selector app=bindingspythonapp -c python --tail=-1
...
{'data': {'orderId': 10}, 'operation': 'create'}
Response for order 10: 204
{'data': {'orderId': 11}, 'operation': 'create'}
Response for order 11: 204
...
- Observe the Node app logs, which show a successful input bining with Kafka:
kubectl get pods
The output should look like this:
NAME READY STATUS RESTARTS AGE
bindings-nodeapp-699489b8b6-mqhrj 2/2 Running 0 4s
bindings-pythonapp-644489969b-c8lg5 2/2 Running 0 4m9s
Look at the Node app logs by running:
kubectl logs --selector app=bindingsnodeapp -c node --tail=-1
The output should look like this:
Node App listening on port 3000!
...
Hello from Kafka!
{ orderId: 240 }
Hello from Kafka!
{ orderId: 241 }
...
Once you're done, you can spin down your Kubernetes resources by running:
kubectl delete -f ./deploy
This will spin down each resource defined by the .yaml files in the deploy
directory, including the kafka component.
Once you delete all quickstart apps, delete Kafka in the cluster.
helm uninstall dapr-kafka --namespace kafka
And finally, you can delete the kafka namespace
kubectl delete ns kafka
Now that you've run the quickstart locally and/or in Kubernetes, let's unpack how this all works. The app is broken up into input binding app and output binding app:
Before looking at the application code, let's see the Kafka bindings component yamls(local, and Kubernetes), which specify brokers
for Kafka connection, topics
and consumerGroup
for consumer, and publishTopic
for publisher topic.
See the howtos in references for the details on input and output bindings
This configuration yaml creates sample-topic
component to set up Kafka input and output bindings through the Kafka sample
topic.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: sample-topic
spec:
type: bindings.kafka
version: v1
metadata:
# Kafka broker connection setting
- name: brokers
value: [kafka broker address]
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "false"
Navigate to the nodeapp
directory and open app.js
, the code for the Node.js input bindings sample app. Here you're exposing one API endpoint using express
. The API name must be identical to the component name which is specified in Kafka bindings component yaml. Then Dapr runtime will consume the event from sample
topic and then send the POST request to Node app with the event payload.
app.post('/sample-topic', (req, res) => {
console.log("Hello from Kafka!");
console.log(req.body);
res.status(200).send();
});
Navigate to the pythonapp
directory and open app.py
, the code for the output bindings sample app. This sends POST request to Dapr http endpoint http://localhost:3500/v1.0/bindings/<output_bindings_name>
with the event payload every second. This app uses sample-topic
bindings component name as <output_bindings_name>
. Then Dapr runtime will send the event to sample
topic which is specified in the above Kafka bindings component yaml.
dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
n = 0
while True:
n += 1
payload = { "data": {"orderId": n}, "operation": "create" }
print(payload, flush=True)
try:
response = requests.post(dapr_url, json=payload)
print(response.text, flush=True)
except Exception as e:
print(e)
time.sleep(1)
- Learn more about bindings in the Dapr docs
- How to create an event-driven app using input bindings
- How to send events to external systems using Output Bindings
- Explore additional quickstarts.