Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add instructions on how to use PuTHDFS with MapR-FS

...

Expand

Q: ListenHTTP -

I have built most of our production flow in NiFi. Part of the data is received from thousands of distributed components via the ListenHTTP processor. I now need to add some automatic registration for these components (this assigns some ID to the component) and enable authentication using the generated ID. Is this possible to implement with the current version? The registration could happen outside (maybe using Play), but the file reception should happen in NiFi in order to initiate the flow.

  • A: The ListenHTTP processor isn't so general purpose as to support custom designed authentication and authorization schemes. You could use it to restrict who can provide data through use of HTTPS and specific certificate DN patterns or to simply accept whatever data is brought in and then have a follow-on processor in the flow choose to drop data or not because it knows and trusts the source though. If you want to be able to block data as a client component tries to feed it to NiFi using a custom scheme, then you'll probably want to look at a custom processor or wire together the HandleHttpRequest and HandleHttpResponse processors. With those you can build a rather custom and powerful API/interface and have it support your own chosen authentication and authorization scheme on top of HTTP/S. With those processors if you understand HTTP well you can basically visually construct a web service. It requires the user to have a lot of knowledge to get the most of out it though.

Q: GetSFTP -

I'm setting up a GetSFTP processor to run every morning to download files expected in a regular location that is named based on the previous day's date, e.g. /logs/2015-09-08. I'm trying to set the GetSFTP processor "Remote Path". What NiFi expression language statement should I use?

  • A: Try this expression in your Remote Path:
    /logs/${now():toNumber():minus(86400000):format('yyyy-MM-dd')}

Q: GetHTTP & InvokeHTTP -

The GetHTTP processor works fine with static filename when getting files from a website. However, I have a use case where I need to download a file daily and the filename is the date of today, ie: 09222015.zip. Since the URL property of the GetHTTP does not support expression language, I cannot do something like http://example.com/${now():format('MMddyyyy')}.zip. Is there a way I can specify the filename dynamically? Or using other processor to make this work. Please advise.

  • A: InvokeHTTP will allow you to use the Expression Language to do an HTTP GET.
    It works a bit differently than a GetHTTP, though, because GetHTTP is a "Source Processor" whereas InvokeHTTP needs to
    be fed a FlowFile in order to do anything. So you can use GenerateFlowFile as a source and have it generate a 
    0 byte FlowFile (set the File Size property to "0 B"). Then just connect GenerateFlowFile to InvokeHTTP.
    Having said that, it is a bit awkward to have to use a GenerateFlowFile to trigger InvokeHTTP to run, so
    there is a ticket (NIFI-993) to change the GetHTTP to evaluate the Expression Language for the URL
    property. That should be fixed in NiFi version 0.4.0. In the meantime, though, GenerateFlowFile -> InvokeHTTP should provide you with
    the capability you're looking for.
  • In addition, if you want to retain the filename of the file you are ingesting, you could use an UpdateAttribute processor before the InvokeHTTP. For example: GenerateFlowFile -> UpdateAttribute -> InvokeHTTP. In UpdateAttribute create a property called 'filename' and give it a value of: ${now():format('MMddyyyy')}.zip – Then, in the InvokeHTTP, use that for the Expression Language in the URL: http://example.com/${filename} ... Now you'll get your content and will have retained the filename you used to pull it.

Q: GetTwitter -

I am trying to use the GetTwitter processor to pull in tweets with a certain keyword. Do I need to pull in the tweets and then use the RouteOnAttribute processor?

  • A: You should not need to use a RouteOnAttribute to pull out only messages that you are interested in. The GetTwitter processor has a property named "Twitter Endpoint." The default value is "Sample Endpoint." This endpoint just returns a sample of approximately 1% of the Twitter data feed, without applying any sort of filters or anything. If you want to receive only tweets that have the word "foo" (for example), you should set the Twitter Endpoint to "Filter Endpoint." Then, in the "Terms to Filter On" property, set the value to "foo" and you should be good to go. If you want to only accept tweets that are in English, you can  also set the "Languages" property to "EN". The key, though, is to ensure that you are using the Filter Endpoint; otherwise, these settings will be ignored. Should you wish to perform additional routing / filtering, you can certainly use additional processors to do so. For example, you could use EvaluateJsonPath to pull out specific fields of interest and then use RouteOnAttribute to route based on those fields.

Q: PutHDFS -

Is there any way to bypass writing FlowFiles on disk and directly pass those files to HDFS as is? Also, if the files are compressed (zip/gzip), can we store the files on HDFS as uncompressed?

  • A: There is no way to bypass having NiFi take a copy of the data, and this is by design. NiFi is helping you formulate a graph of dataflow requirements from given source(s) through given processing steps and ultimately driving data into given destination systems. As a result, it takes on the challenge of handling transactionality of each interaction and the buffering and backpressure to deal with the realities of different production/consumption patterns. As for storing files on HDFS as uncompressed - yes: both of those formats (zip/gzip) are supported in NiFi out of the box. You simply run the data through the proper process prior to the PutHDFS process to unpack (zip) --using UnpackContent, or decompress (gzip) --using CompressContent, as needed. (In CompressContent, choose the setting for decompress mode.)

Is there any way to connect to a MapR cluster using the HDFS compatible API?

  • A: Yes but it requires compiling NiFI from source. The process is rather straight forward:
  1. install mapr-client
  2. configure mapt-client as per MapR documentation
  3. Confirm everything is working by using `maprlogin kerberos` or `maprlogin password` followed by `hadoop fs -ls -d`
  4. compile using profile:
    -Pmapr -Dhadoop.version=<mapr_artifact_version_as_per_mapr_documentation> where artifact version matching your MapR release.
  5. create a core-site.xml with the following content:

    <configuration>
    <property>
    <name>fs.defaultFS</name>
    <value>maprfs:///</value>
    </property>
    </configuration>

  6. create the PutHDFS processor and point to the core-site.xml above

    If the cluster is running in secure mode (Kerberos or MapRSASL)

  7. modify bootstrap.conf so that it contains java.arg.15=-Djava.security.auth.login.config=/opt/mapr/conf/mapr.login.conf -Dhadoop.login=hybrid

  8. Login as the user that is running NiFi (e.g. nifi) and perform a `maprlogin password`
  9. using the same account, try once more to run `hadoop fs -ls`
  10. If all works, start the processor


NOTES

  1. In real world environments the maprticket assigned to the NiFI user must be a long lived ticket, such as a service ticket
  2. Presently it doesn't seem to be possible to use pure Kerberos settings. Work is ongoing to try to identify how to solve this issue

Connections

Expand

Q: If I have FlowFiles stuck in my queue, how can I clear them out?

  • A: Once the Apache NiFi 0.4.0 release is available you will be able to simply 'right-click -> Purge Queue'. For older versions of NiFi If you are testing a flow and do not care about what happens to the test data that is stuck in a connection queue, you can reconfigure the connection and temporarily set the FlowFile Expiration to something like 1 sec so that the FlowFiles currently in the queue will expire. Then, you can reset it to 0 sec (which means never expire). Note that you must stop both processors on either side of a connection before you can edit the settings.

...