Pulsar Beam is a streaming service via HTTP built on Apache Pulsar.
Beam is an http based streaming and queueing system backed up by Apache Pulsar.
Opening an issue and PR are welcomed! Please email [email protected]
for any inquiry or demo.
Immediately, Pulsar can be supported on Windows and any languages with HTTP support.
It has a very small footprint with a 15MB docker image size.
Supports HTTP SSE streaming
REST API and endpoint swagger document is published at this link
This is the endpoint to POST
a message to Pulsar.
/v2/firehose/{persistent}/{tenant}/{namespace}/{topic}
Valid values of {persistent} are p
, persistent
, np
, nonpersistent
These HTTP headers may be required to map to Pulsar topic.
PulsarBrokerURL
in the pulsar-beam.yml file if it is absent.This is the endpoint to GET
messages from Pulsar as a consumer subscription
/v2/sse/{persistent}/{tenant}/{namespace}/{topic}
Valid values of {persistent} are p
, persistent
, np
, nonpersistent
These HTTP headers may be required to map to Pulsar topic.
PulsarBrokerURL
in the pulsar-beam.yml file if it is absent.Query parameters
exclusive
as default, shared
, and failover
latest
as default and earliest
Polls a batch of messages always from the earliest subscription position from a topic.
/v2/poll/{persistent}/{tenant}/{namespace}/{topic}
These HTTP headers may be required to map to Pulsar topic.
PulsarBrokerURL
in the pulsar-beam.yml file if it is absent.Query parameters
exclusive
as default, shared
, and failover
Webhook registration is done via REST API backed by a database of your choice, such as MongoDB, in momery cache, and Pulsar itself. Yes, you can use a compacted Pulsar topic as a database table to perform CRUD. The configuration parameter is "PbDbType": "inmemory",
in the pulsar_beam.yml
file or the env variable PbDbType
.
The management REAT API has this endpoint. Here is the swagger document
/v2/topic
Pulsar Beam can decode and authenticate JWT generated by Pulsar. Webhook management requires a subject in JWT that matches the tenant name in the topic full name. pulsar-admin token
can be used to generate such token.
Pulsar Beam requires the same public and private keys to generate and verify JWT. These public and private key should be specified in the config to be loaded.
To disable JWT authentication, set the paramater HTTPAuthImpl
in the config file or env variable to noauth
.
If a webhook's response contains a body and three headers including Authorization
for Pulsar JWT, TopicFn
for a topic fully qualified name, and PulsarUrl
, the beam server will send the body as a new message to the Pulsar's topic specified as in TopicFn and PulsarUrl.
Both json and yml format are supported as configuration file. The configuration paramters are specified by config.go. Every parameter can be overridden by an environment variable with the same name.
In order to offer high performance and division of responsiblity, webhook and receiver endpoint can run independently -mode broker
or -mode receiver
. By default, the server runs in a hybrid mode with all features running in the same process.
The docker image can be pulled from dockerhub.io.
$ sudo docker pull kafkaesqueio/pulsar-beam
Here are steps to build docker image and run docker container in a file based configuration.
$ sudo docker build -t pulsar-beam .
pulsar_beam.yml
and private and public key files can be mounted and passed in as an env variable PULSAR_BEAM_CONFIG
. The certificate is required to connect to Pulsar with TLS enabled.$ sudo docker run -d -it -v /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem:/etc/ssl/certs/ca-bundle.crt -p 8085:8085 --name=pbeam-server pulsar-beam
gops
is built in the docker image for troubleshooting purpose.
Pulsar Beam can be deployed within the same cluster as Pulsar. This helm chart deploys a webhook broker in its own pod. The rest of HTTP receiver endpoint and REST API are deployed as a container within the Pulsar proxy pod, that offers scalability with multiple replicas.
Clone the repo at your gopath src/github.com/kafkaesque-io/pulsar-beam folder.
Install golint.
$ go install github.com/golang/lint
$ cd src
$ golint ./...
There are two scripts used for CI. You might want to run them in the local environment before submitting a PR. This CI script does linting, go vet and go build. The code coverage script runs unit test and tallies up the code coverage.
The steps how to start the web server.
$ cd src
$ go run main.go
There are scripts under ./scripts
folder to run code analysis, vetting, compilation, unit test, and code coverage manually as all of these are part of CI checks by Github Actions.
One end to end test is under ./src/e2e/e2etest.go
, that performs the following steps in order:
Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of the free plan at kesque.com as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow.
Step to perform unit test
$ cd src/unit-test
$ go test -v .