Particle Projects

May 2020

Particle Particle Webhook to InfluxData


InfluxData provides a fully managed and hosted cloud system for data ingestion, storage, and analytics.   The version 2.0 system is capable of managing multiple users (customers) within a single subscription, and their data can be kept segregated by using “bucket” categorization.   Charting, alerting, and high availability is supported.   Pricing is very reasonable.  

Getting Started

Create a free account and login to your fully managed and hosted InfluxDB version 2.0

Determine your organization ID (orgID) by looking at the URL after you login.   It will be the string following /orgs/.  

Get the API URL by clicking on the Client Libraries tab.  

Create a token by clicking on the Tokens tab.  

A default bucket is created and can be found by clicking on the Buckets tab and looking at the bottom of the list.   Any bucket with the "_" prefix is a system bucket and should not be touched.   You need to determine your bucket ID since the API's work better with this than the bucket name.   From the InfluxData Console, look at the bucket list and to the right of your bucket name, the ID will be listed.  

You should be able to get the Bucket ID from the API command https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/buckets, but it did not work for me.  

GET:


https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/buckets

Response:


{
  "links": {
    "self": "/api/v2/buckets?descending=false&limit=20&offset=0"
  },
  "buckets": []
}

Once you have your Org ID and your Bucket ID, you can do much more.   First, confirm you can retrieve details about a bucket by using a GET for a specific bucket /buckets/{bucketID}.  


https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/buckets/your-bucket-id

If that works, then you are ready to issue a command to write data to your bucket using a POST command.   InfluxDB uses line protocol to write data points.   It is a text-based format that provides the measurement, tag set, field set, and timestamp of a data point.   The measurement and field set are required, and the tag set and timestamp are optional.   For Particle device applications, it is probably best to minimize the data payload to just a measurement and field set, unless the metadata is crutial.   InfluxData adds a local server timestamp to data written to a bucket if the argument is missing.  

Line Protocol Syntax:


<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>]

Example with metadata (tag set):


myMeasurement,tag1=value1,tag2=value2,tag3=value3 fieldKey="fieldValue"

Minimal IoT payload with two measurements:


Boron_A A1=1.9,A2=2.3

The Line Protocol will not accept the newline character \l or the carrage return + newline character \l\n.

When writing data to a bucket, a 204 response code (versus 200) is normal.   Multiple lines (measurements) must end with the newline character \n according to the InfluxDB line protocol.  

Dashboard

After you have written data to a bucket, you can explore the easy to configure dashboard to visualize your data.   The Data Explorer has powerful and easy to use tools to aggregate your raw data for input to your dashboard controls.  

Particle Webhook to InfluxData

Create a Particle Webhook for InfluxData as shown in the screen image below, substituting your InfuxData server URL, organization ID, bucket ID, and token.  


{{{PARTICLE_EVENT_VALUE}}}

The Boron/Argon code example below will publish two minimal size messages to the influxdata webhook created, causing it to be passed to InfluxData.  


/*
 * Project        InfluxData.ino
 * Description:   Publish multiple analog input values to InfluxData
 * Author:        Mark W Kiehl / Mechatronic Solutions LLC
 * Date:          July 2020
 * REVISION:      0  
 */

#include "Particle.h"

/////////////////////////////////////////////////////////////////////////
// analog inputs

// Timer for publishing analog input values to the Particle Cloud
const unsigned long TIMER_PUBLISH_AI_INTERVAL_MS = 10000; // Min is 1000 ms
unsigned long timerPublishLastAI = 0;  

// Timer for publishing analog input alarm events to the Particle Cloud
const unsigned long TIMER_ALARM_PUBLISH_AI_INTERVAL_MS = 15000; // Min is 1000 ms
unsigned long timerAlarmPublishLastAI = 0;  

unsigned long publish_error_count = 0;

// Bundle all of the analog input data into a structure.
const byte AI_COUNT = 2;
struct analog_inputs_t {
  byte pin;
  String pinName;
  unsigned int ADC;
  unsigned int ADC_offset;
  unsigned long ai_samples;
  unsigned int ADCmin;
  unsigned int ADCmax;
  double fs_val;
  String fs_unit;
  double mV_to_fs;
  double fs_low_limit;
  double fs_high_limit;
  unsigned long timer_low_ms;
  unsigned long timer_high_ms;
  unsigned long timer_limit_low_ms;
  unsigned long timer_limit_high_ms;
  boolean alarm;
  unsigned long alarm_last_ms;
} arr_ai[AI_COUNT];
/////////////////////////////////////////////////////////////////////////


void setup() {
  pinMode(D7, OUTPUT);
  digitalWrite(D7, LOW);

  Serial.begin();
  waitFor(Serial.isConnected, 30000);
  delay(1000);
  Serial.printlnf("System version: %s", (const char*)System.version());

  // Initialize arr_ai
  arr_ai[0].pin = A1;
  arr_ai[0].pinName = "A1";
  arr_ai[1].pin = A2;
  arr_ai[1].pinName = "A2";
  for (int i=0; i<AI_COUNT; i++) {
    pinMode(arr_ai[i].pin, INPUT);
    arr_ai[i].ADC = 0;
    arr_ai[i].ADCmin = 4096;
    arr_ai[i].ADCmax = 0;
    arr_ai[i].fs_val = 0.0;                 // Updated during each ReadAnalogInputs()
    arr_ai[i].alarm = false;
    arr_ai[i].alarm_last_ms = millis();
    arr_ai[i].timer_low_ms = millis();  
    arr_ai[i].timer_high_ms = millis();  
    // Move anything below to before for () in order to assign unique values to each analog input.
    arr_ai[i].ADC_offset = 1;               // Calibration correction
    arr_ai[i].mV_to_fs = 0.001;             // Conversion factor to apply to mV analog input reading to full scale
    arr_ai[i].fs_unit = "V";                // Unit for the analog input values in full scale
    arr_ai[i].fs_low_limit = 0.543;         // Full scale values less than fs_low_limit will trigger .alarm
    arr_ai[i].fs_high_limit = 3.300;        // Full scale values greater than fs_high_limit will trigger .alarm
    arr_ai[i].timer_limit_low_ms = 10000;   // Minimum time the value low value .mV_to_fs must be less than .fs_low_limit in order to trigger an alarm
    arr_ai[i].timer_limit_high_ms = 10000;  // Minimum time the high value must persist in order to trigger an alarm
  }
  timerPublishLastAI = millis();
  timerAlarmPublishLastAI = millis();

} // setup()


void loop() {

  ReadAnalogInputs();
  PublishAiVals();
  ProcessAiAlarms();

} // loop()


/////////////////////////////////////////////////////////////////////////
// Analog inputs

void ReadAnalogInputs() {
  // 12 bit ADC (values between 0 and 4095 or 2^12) or a resolution of 0.8 mV
  // Hardware minimum sample time to read one analog value is 10 microseconds. 
  // Raw ADC values are adjusted by a calibration factor arr_ai_ADC_calib[n].ADCoffset
  // that is determined by bench testing against precision voltage reference. 
  // Update .ADC with the cumulative ADC value (to calculate average over the publish interval).
  // Update .fs_val with the current ADC value and check for alarm conditions.
  for (int i=0; i<AI_COUNT; i++) {
    unsigned int ADC = analogRead(arr_ai[i].pin) + arr_ai[i].ADC_offset;
    arr_ai[i].ADC += ADC;
    arr_ai[i].fs_val = double(ADC) * 3300.0 / 4096.0 * arr_ai[i].mV_to_fs;
    arr_ai[i].ai_samples++;

    if (ADC < arr_ai[i].ADCmin) arr_ai[i].ADCmin = ADC;
    if (ADC > arr_ai[i].ADCmax) arr_ai[i].ADCmax = ADC;
    
    // Reset the low / high value timers & alarm timer if limits are not exceeded and no alarm exists
    if (arr_ai[i].alarm == false) {
      if (arr_ai[i].fs_val > arr_ai[i].fs_low_limit) {
        arr_ai[i].timer_low_ms = millis();
        arr_ai[i].alarm_last_ms = millis();
      }
      if (arr_ai[i].fs_val < arr_ai[i].fs_high_limit) {
        arr_ai[i].timer_high_ms = millis();
        arr_ai[i].alarm_last_ms = millis();
      }
    }

    // Check for an alarm condition (low voltage for longer than .timer_limit_low_ms, or high voltage for longer than .timer_limit_high_ms)
    if (arr_ai[i].fs_val < arr_ai[i].fs_low_limit && millis() - arr_ai[i].timer_low_ms > arr_ai[i].timer_limit_low_ms) {
      arr_ai[i].alarm = true;
    }
    if (arr_ai[i].fs_val > arr_ai[i].fs_high_limit && millis() - arr_ai[i].timer_high_ms > arr_ai[i].timer_limit_high_ms) {
      arr_ai[i].alarm = true;
    }

  } // for 
}  // ReadAnalogInputs()


void PublishAiVals() {
  if (timerPublishLastAI > millis())  timerPublishLastAI = millis();
  if ((millis() - timerPublishLastAI) > TIMER_PUBLISH_AI_INTERVAL_MS) {

    String eventVal = "Boron_A ";
    //Boron_A A1=1.9,A2=2.3
    for (int i=0; i<AI_COUNT; i++) {
      // Calculate the AVERAGE full scale analog input value measured over arr_ai[#].ai_samples since the last publish.
      //double fs_val = double(arr_ai[i].ADC) / double(arr_ai[i].ai_samples) * 3300.0 / 4096.0 * arr_ai[i].mV_to_fs;
      // Calculate the MINIMUM full scale analog input value measured over arr_ai[#].ai_samples since the last publish.
      //double fs_val = double(arr_ai[i].ADCmin) * 3300.0 / 4096.0 * arr_ai[i].mV_to_fs;
      // Calculate the MAXIMUM full scale analog input value measured over arr_ai[#].ai_samples since the last publish.
      double fs_val = double(arr_ai[i].ADCmax) * 3300.0 / 4096.0 * arr_ai[i].mV_to_fs;
      String line = String::format("%s=%0.1f", arr_ai[i].pinName.c_str(), fs_val);
      eventVal.concat(line.c_str());
      if (i < AI_COUNT-1)
        eventVal.concat(",");
      arr_ai[i].ADC = 0;
      arr_ai[i].ADCmin = 4096;
      arr_ai[i].ADCmax = 0;
      arr_ai[i].ai_samples = 0;
    } // for
    // Below adds the optional Unix timestamp
    //eventVal.concat(String::format(" %d",Time.local()).c_str());
    Serial.printlnf("'%s'", eventVal.c_str());
    // 'Boron_A A1=1.9,A2=2.1'
       
    byte PubResult = Particle.publish("influxdata", eventVal, PRIVATE);
    if (PubResult == 1) 
      publish_error_count = 0;
    else
      publish_error_count++;
    
    timerPublishLastAI = millis();
  } // timer
} // PublishAiVals()


void ProcessAiAlarms() {
  // Publish any analog input alarm conditions to the Particle Cloud, choosing 
  // the oldest alarm indicated by arr_ai[#].alarm & arr_ai[#].alarm_last_ms.
  if (timerAlarmPublishLastAI > millis())  timerAlarmPublishLastAI = millis();
  if ((millis() - timerAlarmPublishLastAI) > TIMER_ALARM_PUBLISH_AI_INTERVAL_MS) {
    // Find the index for the oldest alarm, indicated by arr_ai[#].alarm & arr_ai[#].alarm_last_ms
    unsigned long oldest_ms = 0;
    byte oldest_alarm = 255;
    for (int i=0; i<AI_COUNT; i++) {
      if (arr_ai[i].alarm && millis() - arr_ai[i].alarm_last_ms > oldest_ms) {
        oldest_ms = millis() - arr_ai[i].alarm_last_ms;
        oldest_alarm = i;
      }
    }  // for
    // Publish the oldest alarm to the Particle Cloud
    if (oldest_alarm == 255) {
      Serial.printlnf("%u No ai alarms to publish", millis());
    } else {
      // Publish the oldest alarm
      if (PLATFORM_ID == PLATFORM_XENON) {
        if (arr_ai[oldest_alarm].fs_val < arr_ai[oldest_alarm].fs_low_limit) {
          Serial.printlnf("%u, %s LOW (%.4f < %.4f %s) for more than %u ms", millis(), arr_ai[oldest_alarm].pinName.c_str(), arr_ai[oldest_alarm].fs_val, arr_ai[oldest_alarm].fs_low_limit, arr_ai[oldest_alarm].fs_unit.c_str(), arr_ai[oldest_alarm].timer_limit_low_ms);
          arr_ai[oldest_alarm].timer_low_ms = millis();  
        } else if (arr_ai[oldest_alarm].fs_val > arr_ai[oldest_alarm].fs_high_limit) {
          Serial.printlnf("%u, %s HIGH (%.4f > %.4f %s) for more than %u ms", arr_ai[oldest_alarm].pinName.c_str(), arr_ai[oldest_alarm].fs_val, arr_ai[oldest_alarm].fs_high_limit, arr_ai[oldest_alarm].fs_unit.c_str(), arr_ai[oldest_alarm].timer_limit_high_ms);
          arr_ai[oldest_alarm].timer_high_ms = millis();  
        }
        arr_ai[oldest_alarm].alarm = false;
      } else {
        if (arr_ai[oldest_alarm].fs_val < arr_ai[oldest_alarm].fs_low_limit) {
          Serial.printlnf("%u, %s LOW (%.4f < %.4f %s) for more than %u ms", millis(), arr_ai[oldest_alarm].pinName.c_str(), arr_ai[oldest_alarm].fs_val, arr_ai[oldest_alarm].fs_low_limit, arr_ai[oldest_alarm].fs_unit.c_str(), arr_ai[oldest_alarm].timer_limit_low_ms);
          arr_ai[oldest_alarm].timer_low_ms = millis();  
          byte iPubResult = Particle.publish(arr_ai[oldest_alarm].pinName, String::format("LOW %.4f %s for more than %u ms", arr_ai[oldest_alarm].fs_val, arr_ai[oldest_alarm].fs_unit.c_str(), arr_ai[oldest_alarm].timer_limit_low_ms), PRIVATE);
          if (iPubResult == 1) arr_ai[oldest_alarm].alarm = false;
        } else if (arr_ai[oldest_alarm].fs_val > arr_ai[oldest_alarm].fs_high_limit) {
          Serial.printlnf("%s HIGH (%.4f > %.4f %s) for more than %u ms", arr_ai[oldest_alarm].pinName.c_str(), arr_ai[oldest_alarm].fs_val, arr_ai[oldest_alarm].fs_high_limit, arr_ai[oldest_alarm].fs_unit.c_str(), arr_ai[oldest_alarm].timer_limit_high_ms);
          arr_ai[oldest_alarm].timer_high_ms = millis();  
          byte iPubResult = Particle.publish(arr_ai[oldest_alarm].pinName, String::format("HIGH %.4f %s for more than %u ms", arr_ai[oldest_alarm].fs_val, arr_ai[oldest_alarm].fs_unit.c_str(), arr_ai[oldest_alarm].timer_limit_high_ms), PRIVATE);
          if (iPubResult == 1) arr_ai[oldest_alarm].alarm = false;
        }
      } // PLATFORM_ID
      arr_ai[oldest_alarm].alarm_last_ms = millis();
    } // oldest_alarm
    timerAlarmPublishLastAI = millis();
  } // timer
} // ProcessAiAlarms()

/////////////////////////////////////////////////////////////////////////


 

Related Tutorials

InfluxData Integration from Particle tutorials

Particle & InfluxData by InfluxData

Integrating Particle with InfluxDB Cloud by davidgs   also here

InfluxDB Cloud 2.0 Documentation Links

Getting Started

InfluxDB API

Write time series data into InfluxDB

 


Do you need help developing or customizing a IoT product for your needs?   Send me an email requesting a free one hour phone / web share consultation.  

 

The information presented on this website is for the author's use only.   Use of this information by anyone other than the author is offered as guidelines and non-professional advice only.   No liability is assumed by the author or this web site.