npm install @upstash/kafka
import { Kafka } from "@upstash/kafka"; const kafka = new Kafka({ url: "<UPSTASH_KAFKA_REST_URL>", username: "<UPSTASH_KAFKA_REST_USERNAME>", password: "<UPSTASH_KAFKA_REST_PASSWORD>", });
const p = kafka.producer(); const message = { hello: "world" }; // Objects will get serialized using `JSON.stringify` const response = await p.produce("TOPIC", message); const response2 = await p.produce("TOPIC", message, { partition: 1, timestamp: 4567, key: "KEY", headers: [{ key: "TRACE-ID", value: "32h67jk" }], });
const p = kafka.producer(); const res = await p.produceMany([ { topic: "TOPIC", value: "MESSAGE", // ...options }, { topic: "TOPIC-2", value: "MESSAGE-2", // ...options }, ]);
const c = kafka.consumer(); const messages = await c.consume({ consumerGroupId: "group_1", instanceId: "instance_1", topics: ["test.topic"], autoOffsetReset: "earliest", });
consume
const consumerGroupId = "mygroup"; const instanceId = "myinstance"; const topic = "my.topic"; const c = kafka.consumer(); const messages = await c.consume({ consumerGroupId, instanceId, topics: [topic], autoCommit: false, }); for (const message of messages) { // message handling logic await c.commit({ consumerGroupId, instanceId, offset: { topic: message.topic, partition: message.partition, offset: message.offset, }, }); }
const c = kafka.consumer(); const messages = await c.fetch({ topic: "greeting", partition: 3, offset: 42, timeout: 1000, });
Was this page helpful?