Skip to content

CPP SSE Client

Akram El Assas edited this page Jul 19, 2025 · 4 revisions

Prerequisites

Windows

  • Install MSYS2
  • Open the MSYS2 MinGW 64-bit shell and run:
pacman -Syu         # Update package database and core system
pacman -Su          # Finish update (may require closing/reopening terminal)

pacman -S mingw-w64-x86_64-gcc mingw-w64-x86_64-curl

Linux

On Debian/Ubuntu:

sudo apt update
sudo apt install libcurl4-openssl-dev build-essential

On Fedora:

sudo dnf install libcurl-devel gcc-c++

On Arch Linux:

sudo pacman -S curl

SSE Client Sample

Here is a sample C++ SSE client sse.cpp:

#include <iostream>
#include <string>
#include <curl/curl.h>
#include <nlohmann/json.hpp>
#include <thread>
#include <atomic>

using json = nlohmann::json;

static std::string baseUrl = "http://localhost:8000/api/v1";
static std::string username = "admin";
static std::string password = "wexflow2018";
static int workflowId = 41;

// Helper for libcurl response data
static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp)
{
  ((std::string *)userp)->append((char *)contents, size * nmemb);
  return size * nmemb;
}

// Perform HTTP POST with JSON payload, return response body as string
std::string httpPost(const std::string &url, const std::string &jsonPayload, const std::string &bearerToken = "")
{
  CURL *curl = curl_easy_init();
  if (!curl)
    throw std::runtime_error("Failed to init curl");

  std::string readBuffer;
  struct curl_slist *headers = nullptr;

  headers = curl_slist_append(headers, "Content-Type: application/json");
  if (!bearerToken.empty())
  {
    std::string authHeader = "Authorization: Bearer " + bearerToken;
    headers = curl_slist_append(headers, authHeader.c_str());
  }

  curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
  curl_easy_setopt(curl, CURLOPT_POSTFIELDS, jsonPayload.c_str());
  curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
  curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);

  CURLcode res = curl_easy_perform(curl);

  long httpCode = 0;
  curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);

  curl_slist_free_all(headers);
  curl_easy_cleanup(curl);

  if (res != CURLE_OK)
  {
    throw std::runtime_error(std::string("curl_easy_perform() failed: ") + curl_easy_strerror(res));
  }
  if (httpCode < 200 || httpCode >= 300)
  {
    throw std::runtime_error("HTTP error code: " + std::to_string(httpCode));
  }

  return readBuffer;
}

// Login to get token
std::string login(const std::string &user, const std::string &pass)
{
  json j;
  j["username"] = user;
  j["password"] = pass;
  j["stayConnected"] = false;

  std::string url = baseUrl + "/login";
  return json::parse(httpPost(url, j.dump()))["access_token"];
}

// Start workflow, returns jobId string
std::string startWorkflow(const std::string &token, int workflowId)
{
  std::string url = baseUrl + "/start?w=" + std::to_string(workflowId);
  // POST empty body for start
  std::string raw = httpPost(url, "", token);
  json j = json::parse(raw);
  return j.get<std::string>();
}

// SSE event handler
size_t sseWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
  size_t totalSize = size * nmemb;
  std::string chunk(ptr, totalSize);
  std::cout << ">>> SSE Chunk received:\n"
            << chunk << std::endl;

  static std::string buffer;
  buffer += chunk;

  size_t pos;
  while ((pos = buffer.find("\n\n")) != std::string::npos)
  {
    std::string eventBlock = buffer.substr(0, pos);
    buffer.erase(0, pos + 2);

    size_t dataPos = eventBlock.find("data: ");
    if (dataPos != std::string::npos)
    {
      std::string jsonData = eventBlock.substr(dataPos + 6);
      std::cout << "Received SSE data: " << jsonData << std::endl;

      try
      {
        auto j = json::parse(jsonData);
        if (j.contains("status") && j["status"] == "Done")
        {
          std::cout << "Workflow status Done received, exiting SSE loop." << std::endl;
          std::atomic<bool> *doneFlag = static_cast<std::atomic<bool> *>(userdata);
          doneFlag->store(true);
        }
      }
      catch (const std::exception &e)
      {
        std::cerr << "JSON parse error: " << e.what() << std::endl;
      }
    }
  }

  return totalSize;
}

// Listen to SSE stream until "Done" status
void listenToSse(const std::string &sseUrl, const std::string &token)
{
  CURL *curl = curl_easy_init();
  if (!curl)
  {
    throw std::runtime_error("Failed to init curl");
  }

  std::atomic<bool> done(false);

  struct curl_slist *headers = nullptr;
  headers = curl_slist_append(headers, "Accept: text/event-stream");
  headers = curl_slist_append(headers, ("Authorization: Bearer " + token).c_str());
  headers = curl_slist_append(headers, "Expect:"); // Disable 'Expect: 100-continue'

  curl_easy_setopt(curl, CURLOPT_URL, sseUrl.c_str());
  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
  curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, sseWriteCallback);
  curl_easy_setopt(curl, CURLOPT_WRITEDATA, &done);
  curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);       // Keep connection open indefinitely
  curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L); // Keep connection alive
  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);      // Required for multithreaded / Windows apps
  curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);       // Optional: enable debug output

  CURLcode res = curl_easy_perform(curl);
  if (res != CURLE_OK)
  {
    std::cerr << "SSE connection error: " << curl_easy_strerror(res) << std::endl;
  }

  curl_slist_free_all(headers);
  curl_easy_cleanup(curl);
}

int main()
{
  try
  {
    std::string token = login(username, password);
    std::string jobId = startWorkflow(token, workflowId);

    std::cout << "Workflow " << workflowId << " started. Job ID: " << jobId << std::endl;

    std::string sseUrl = baseUrl + "/sse/" + std::to_string(workflowId) + "/" + jobId;

    listenToSse(sseUrl, token);

    std::cout << "SSE listening finished. Exiting." << std::endl;
  }
  catch (const std::exception &e)
  {
    std::cerr << "Error: " << e.what() << std::endl;
  }
  return 0;
}

Run the Client

Download samples/clients/sse/cpp example from GitHub repo.

Windows

Open the MSYS2 MinGW 64-bit shell (not the default MSYS shell), and run:

cd cpp
g++ sse.cpp -o wexflow_sse.exe -Iinclude -lcurl -lws2_32 -lcrypt32
./wexflow_sse.exe

Linux

cd cpp
g++ sse.cpp -o wexflow_sse -Iinclude -lcurl -pthread
./wexflow_sse 
  1. Install Guide
  2. HTTPS/SSL
  3. Screenshots
  4. Docker
  5. Configuration Guide
    1. Wexflow Server
    2. Wexflow.xml
    3. Admin Panel
    4. Authentication
  6. Persistence Providers
  7. Getting Started
  8. Android App
  9. Local Variables
  10. Global Variables
  11. REST Variables
  12. Functions
  13. Cron Scheduling
  14. Command Line Interface (CLI)
  15. REST API Reference
    1. Introduction
    2. JWT Authentication
    3. Sample Clients
      1. C# Client
      2. JavaScript Client
      3. PHP Client
      4. Python Client
      5. Go Client
      6. Rust Client
      7. Ruby Client
      8. Java Client
      9. C++ Client
    4. Security Considerations
    5. Swagger
    6. Workflow Notifications via SSE
      1. C# SSE Client
      2. JavaScript SSE Client
      3. PHP SSE Client
      4. Python SSE Client
      5. Go SSE Client
      6. Rust SSE Client
      7. Ruby SSE Client
      8. Java SSE Client
      9. C++ SSE Client
    7. Endpoints
  16. Samples
    1. Sequential workflows
    2. Execution graph
    3. Flowchart workflows
      1. If
      2. While
      3. Switch
    4. Approval workflows
      1. Simple approval workflow
      2. OnRejected workflow event
      3. YouTube approval workflow
      4. Form submission approval workflow
    5. Workflow events
  17. Logging
  18. Custom Tasks
    1. Introduction
    2. General
      1. Creating a Custom Task
      2. Wexflow Task Class Example
      3. Task Status
      4. Settings
      5. Loading Files
      6. Loading Entities
      7. Need A Starting Point?
    3. Installing Your Custom Task in Wexflow
      1. .NET Framework 4.8 (Legacy Version)
      2. .NET 8.0+ (Stable Version)
      3. Referenced Assemblies
      4. Updating a Custom Task
      5. Using Your Custom Task
    4. Suspend/Resume
    5. Logging
    6. Files
    7. Entities
    8. Shared Memory
    9. Designer Integration
      1. Registering the Task
      2. Adding Settings
    10. How to Debug a Custom Task?
  19. Built-in Tasks
    1. File system tasks
    2. Encryption tasks
    3. Compression tasks
    4. Iso tasks
    5. Speech tasks
    6. Hashing tasks
    7. Process tasks
    8. Network tasks
    9. XML tasks
    10. SQL tasks
    11. WMI tasks
    12. Image tasks
    13. Audio and video tasks
    14. Email tasks
    15. Workflow tasks
    16. Social media tasks
    17. Waitable tasks
    18. Reporting tasks
    19. Web tasks
    20. Script tasks
    21. JSON and YAML tasks
    22. Entities tasks
    23. Flowchart tasks
    24. Approval tasks
    25. Notification tasks
    26. SMS tasks
  20. Run from Source
  21. Fork, Customize, and Sync
Clone this wiki locally