What are streams and buffers. NodeJS

The topic of this chapter is threads in Node.JS. We will try to understand this topic well and in detail, because, on the one hand, it turns out that there are no threads in ordinary browser-based JavaScript development, and on the other hand, a strong command of threads is necessary for competent server-side development, since a thread is a universal way work with data sources that are used everywhere.

There are two main types of streams.

The first stream - stream.Readable - reading.
stream.Readable is a built-in class that implements streams for reading; as a rule, it is not used by itself, but its descendants are used. In particular, there is fs.ReadSream for reading from a file. To read a visitor request, server.on('request', ... req...), when processing it, there is a special object that we saw earlier under the name req, the first argument of the request handler.

The second stream - stream.Writable - record.
stream.Writable is a generic way of writing, and here too, stream.Writable itself is not usually used, but its descendants are used.
...to a file: fs.WriteStream
...in response to the visitor: server.on('request', ... res…)

There are some other types of streams, but the most popular are the previous two and their derivatives.

The best way to understand flows is to see how they work in practice. So now we'll start by using fs.ReadStream to read a file.

fs.js

JavaScript

var fs = require("fs"); varstream = new fs.ReadStream(__filename); stream.on("readable", function()( var data = stream.read(); console.log(data); )); stream.on("end", function()( console.log("THE END"); ));

var fs = require("fs") ;

//fs.ReadStream inherits from stream.Readable

var stream = new fs . ReadStream(__filename) ;

console. log(data);

} ) ;

console. log("THE END") ;

} ) ;

So here I am including the fs module and creating a thread. A stream is a JavaScript object that receives information about a resource, in this case, the path to the file is "__filename" and that knows how to work with this resource. fs.ReadStream implements the standard read interface which is described in the stream.Readable class. Let's see it on the diagram.

When a stream object is created - “new stream.Readable”, it connects to the data source, in our case it is a file, and tries to start reading from it. When he read something, he emits an event - “readable”, this event means that the data has been calculated and is in the internal stream buffer, which we can get using the “read ()” call. Then we can do something with the data - "data" and wait for the next "readable" and again if necessary, and so on. When the data source dries up, there are of course sources that do not dry out, for example, random number generators, but the file size is limited, so there will be an “end” event at the end, which means that there will be no more data. Also, at any stage of working with the stream, I can call the “destroy ()” method of the stream. This method means that we no longer need the stream and we can close it and close the corresponding data sources, completely clean everything up.

And now back to the source code. So here we are creating a ReadStream

fs.js

JavaScript

and he immediately wants to open the file. But right there, in this case, it doesn’t mean on the same line at all, because, as we remember, all input-output operations are implemented through “LibUV”, and “LibUV” is designed so that all synchronous input-output handlers will work on the next iteration event loop, that is, knowingly after all current JavaScript has finished running. This means that I can hang all the handlers without any problems and I know for sure that they will be set before the first piece of data is read. I run this code and see what is displayed in the console

The first one fired the 'readable' event and outputted the data, now it's just a regular buffer, but I can convert it to a string using utf-8 encoding with a normal toString call

Another option is to specify the encoding directly when opening the stream

then the conversion will be automatic and we don't need toString().

Finally when the file is over,

fs.js

JavaScript

var fs = require("fs"); //fs.ReadStream inherits from stream.Readable var stream = new fs.ReadStream(__filename, (encoding: "utf-8")); stream.on("readable", function()( var data = stream.read(); console.log(data); )); stream.on("end", function()( console.log("THE END"); ));

stream. on("end" ,function()(

console. log("THE END") ;

} ) ;

then the 'end' event brought me "THE END" to the console. Here the file ended almost immediately, because it was very small. Now I am not modifying the example much, I will do instead of “__filename”, that is, instead of the current file, the file “big.html”, which is located in the current directory.

The big.html file is large, so the readable event fired repeatedly and each time we received another piece of data as a buffer. Also pay attention to the null output that constantly haunts us, you can read about the reason for this output in the documentation, it says that after the data ends, readable returns null. Returning to our buffer, let's print its size to the console and at the same time check if the output is not null

fs.js

JavaScript

var fs = require("fs") ;

//fs.ReadStream inherits from stream.Readable

stream. on("readable", function()) (

var data = stream . read();

} ) ;

stream. on("end" ,function()(

console. log("THE END") ;

} ) ;

These numbers are nothing more than the length of the file fragment read, because when the stream opens the file, it does not read the entire file from it, of course, but only a piece and places it in its internal variable and the maximum size is just sixty-four kilobytes. Until we call stream.read(), it will not read further. After I have received the next data, the internal buffer is cleared and it can read another fragment, and so on and so forth, the last fragment has the length of the rest of the data. In this example, we clearly see the important advantage of using streams, they save memory, no matter how large the file is, anyway, at a time we are processing such a small fragment. The second, less obvious advantage is the versatility of the interface. Here

fs.js

JavaScript

var fs = require("fs"); //fs.ReadStream inherits from stream.Readable var stream = new fs.ReadStream("big.html"); stream.on("readable", function()( var data = stream.read(); if(data != null)console.log(data.length); )); stream.on("end", function()( console.log("THE END"); ));

var stream = new fs . ReadStream("big.html");

we use a ReadStream from a file, but we can replace it with a generally arbitrary stream from our resource at any time, this will not require changing the rest of the code

fs.js

JavaScript

var fs = require("fs"); var stream = new OurStream("our resource"); stream.on("readable", function()( var data = stream.read(); if(data != null)console.log(data.length); )); stream.on("end", function()( console.log("THE END"); ));

var fs = require("fs") ;

var stream = new OurStream("our resource" ) ;

stream. on("readable", function()) (

var data = stream . read();

if (data != null ) console . log (data . length ) ;

} ) ;

stream. on("end" ,function()(

console. log("THE END") ;

} ) ;

Because streams are primarily an interface, that is, in theory, if our stream implements the necessary events and methods, in particular, inherits from stream.Readable, then everything should work fine, but this is of course only if we did not use special capabilities that file streams have. In particular, the ReadStream has additional events

fs.js

JavaScript

var fs = require("fs"); //fs.ReadStream inherits from stream.Readable var stream = new fs.ReadStream("big.html"); stream.on("readable", function()( var data = stream.read(); if(data != null)console.log(data.length); )); stream.on("end", function()( console.log("THE END"); ));

var fs = require("fs") ;

//fs.ReadStream inherits from stream.Readable

var stream = new fs . ReadStream("big.html");

stream. on("readable", function()) (

var data = stream . read();

if (data != null ) console . log (data . length ) ;

} ) ;

stream. on("end" ,function()(

console. log("THE END") ;

} ) ;

Here is the schema for fs.ReadStram and new events are shown in red

Also, I don't know how high your counter goes, but if you fill up the buffer, it will stop passing data to the conversion stream, in which case completed will never actually hit because you don't go to the counter limit. Try changing your highwatermark .

EDIT 2: A Little Better Explanation

How well do you know transform stream duplex stream, which basically means that it can receive data from the source, and it can transmit data to the destination. This is commonly referred to as reading and writing. transform stream inherits from both read stream and write stream implemented by Node.js. However, there is one caveat: transform stream must not implement the _read or _write functions. In that sense, you can think of it as the lesser known passthrough stream .

If you're thinking about transform stream using write stream , you should also consider that there is always a destination in the write stream to flush its contents. A task you have, is that when you create the transform stream, you can't specify a location to send your content to. The only way to pass the data completely through the transform stream is to pass it to the write stream, otherwise, essentially, your streams will be copied and won't be able to accept any more data because there is no room for the data.

That's why when you connect to a write stream, it always works. The write stream makes it easy to backup data by sending data to the destination, so all your data will be passed through the channel and the event will be complete.

The reason your code runs without a write stream when the fetch size is small is because you don't populate your stream, so the transform stream can take in enough data to be able to hit the full event/threshold. As the threshold increases, the number of data that your stream can receive without sending it elsewhere (the write stream) remains unchanged. This causes your thread to get backed up and it can no longer receive data, meaning the completed event will never be thrown.

I would venture to say that if you increase your highwatermark for the transform stream, you can increase your threshold and still have code. However, this method is incorrect. Pipe your stream into a write stream that will send data to dev/null way to creat this write stream:

Varwriter = fs.createWriteStream("/dev/null");

See the section in the Node.js documentation on buffering to explain the error you're running into.



node js stream (3)

I think you're overthinking how it all works, and I like it.

What streams are good for

Threads are good for two things:

    when the operation is slow and it can give you partial results as you get them. For example, read a file, it's slow because hard drives are slow, and it can give you part of the file as you read it. With streams, you can use these parts of the file and start processing them right away.

    they are also good at tying programs together (reader functions). Just like on the command line, you can combine different programs to get the desired result. Example: cat file | grep word cat file | grep word .

How they work under the hood...

Most of these operations, which take time to process and can give you partial results as you get them, are not performed by Node.js, they are performed by the V8 JS Engine, and they only pass those results to JS so you can work with them.

To understand your http example you need to understand how http works

There are various encodings that a web page can send. In the beginning there was only one way. When the entire page was submitted, when it was requested. Now there are more efficient encodings for this. One is fragmented, where parts of a web page are sent before the entire page is sent. This is good because the web page can be processed as it is received. Imagine a web browser. It may start rendering websites before the download is complete.

Your questions and answers.

First, Node.js threads only work in a single Node.js program. Node.js threads cannot interact with a thread on another server or even in a program.

This means that in the example below, Node.js cannot talk to the web server. It cannot tell it to stop or resume.

Node.js<->network<->webserver

What is really happening is that Node.js is requesting a web page and it starts downloading it and there is no way to stop this download. Just remove the socket.

So what actually happens when you do .pause or .continue in Node.js?

It starts buffering the request until you are ready to use it again. But the download never stopped.

event loop

I have a whole answer to explain how the Event Loop works, but I think you're better off.

The first thing to note is that stream.js streams are not limited to HTTP requests. HTTP Requests / Network Resources is just one example of flow in node.js.

Streams are useful for anything that can be processed in small chunks. They allow you to process potentially huge resources in small chunks that fit more easily into your RAM.

Let's say you have a file (several gigabytes in size) and you want to convert all lowercase letters to uppercase and write the result to another file. A naive approach would read the entire file using fs.readFile (error handling omitted for brevity):

fs . readFile ("my_huge_file" , function (err , data ) ( var convertedData = data . toString (). toUpperCase (); fs . writeFile ("my_converted_file" , convertedData ); ));

Unfortunately, this approach will easily overwhelm your RAM, as the entire file must be saved before it can be processed. You also waste precious time waiting for the file to be read. Doesn't it make sense to process the file in small chunks? You can start processing as soon as you get the first bytes, waiting for the hard drive to provide the rest of the data:

var readStream = fs . createReadStream("my_huge_file"); var writeStream = fs . createWriteStream("my_converted_file"); readStream . on ("data" , function (chunk ) ( var convertedChunk = chunk . toString (). toUpperCase (); writeStream . write (convertedChunk ); )); readStream . on ("end" , function () ( writeStream . end (); ));

This approach is much better:

  1. You will only be dealing with small pieces of data that will easily fit into your RAM.
  2. You start processing as soon as the first byte arrives and don't waste time doing nothing but waiting.

Once the stream is opened, node.js will open the file and start reading from it. As soon as the operating system sends some bytes to the stream that reads the file, it will be transferred along with your application.

Getting back to HTTP streams:

  1. The first problem is also relevant here. It is possible that the attacker is sending you large amounts of data in order to overload your RAM and remove (DoS) your service.
  2. However, the second problem is even more important in this case: the network can be very slow (smartphones, I think) and it can take a long time for everything to be sent by the client. By using a stream, you can start processing the request and reduce the response time.

When pausing an HTTP stream: This is not done at the HTTP layer, but below. If you pause the stream, node.js will simply stop reading from the underlying TCP socket. What happens then depends on the kernel. It can still buffer incoming data, so it's ready for you as soon as you finish your current job. . Applications don't need to deal with this. It's none of their business. In fact, the sender app probably doesn't even realize that you're not actively reading anymore!

So it's basically providing data as soon as it's available, but without the overwhelming amount of resources. The main hard work is done either by the operating system (eg net , fs , http ) or by the author of the stream being used (eg zlib which is a Transform stream and usually attached to fs or net).

The diagram below seems to be a fairly accurate overview/diagram of 10.000 feet for the nodal flow class.

Hi all! In this article, we will look at what are streams and buffers and what are their advantages in NodeJS.

In today's world, streams and buffers are used almost everywhere. But why? Let's figure it out.

Buffer

First, we'll talk about what a buffer is. Buffer is a temporary storage for a piece of information that is transferred from one place to another. The buffer is filled with a certain amount of data and then sent to the destination. This allows you to transfer small pieces of information at a time.

What is the advantage of this approach? Imagine that you have a huge box of things. For example, there are tools. The box is heavy and it is quite difficult to move it all at once, right? Let's now dismantle the box into several small boxes, where we put one or two tools. Now we can transfer all the tools more easily. The buffer does the same. It acts as these little boxes that store some part of all transmitted information, which makes our task easier.

Flow

Now let's talk about what streams are. Flow is the amount of information moved over time. We have some amount of data, it is broken into small pieces, which then go through the communication channel to the buffer, and when the buffer is full, it is sent further to the client and processed. All this is a stream.

Where applicable

As I said at the beginning of the article, streams and buffers are used almost everywhere in the modern world, because, as we already know, they significantly improve performance. Probably the most common example is online video. When you watch a video on the Internet, you do not wait for it to load completely, but immediately start watching, as the video loads to the end. It works precisely thanks to the system of streams and buffers.

Conclusion

So, today we looked at what a stream and a buffer are in NodeJS, how they work and why they are used. In the following articles, we will start writing and reading our own streams ourselves.



Most recently, version 10.5.0 of the Node.js platform was released. One of its main features was the first support for working with threads added to Node.js, which is still experimental. This fact is especially interesting in light of the fact that this feature is now available for a platform whose adherents have always been proud that it does not need threads, thanks to the fantastic asynchronous I / O subsystem. However, support for threads in Node.js did appear. Why did it happen? To whom and why they can be useful?

In a nutshell, this is necessary so that the Node.js platform can reach new heights in areas in which it previously showed not the most remarkable results. We are talking about performing calculations that use processor resources intensively. This is mainly the reason why Node.js is not strong in areas such as artificial intelligence, machine learning, big data processing. A lot of effort has been put into letting Node.js perform well in solving such problems, but here this platform still looks much more modest than, for example, in the development of microservices.

The author of the material, the translation of which we are publishing today, says that he decided to reduce the technical documentation that can be found in the original pull request and in , to a set of simple practical examples. He hopes that anyone who understands these examples will learn enough to get started with threading in Node.js.

About the worker_threads module and the --experimental-worker flag

Multithreading support in Node.js is implemented as the worker_threads module. Therefore, in order to take advantage of the new feature, this module must be included using the require command.

Please note that you can only work with worker_threads using the -- experimental-worker flag when running the script, otherwise the system will not find this module.

Please note that the flag includes the word "worker" (worker), and not "thread" (thread). This is exactly how what we are talking about is mentioned in the documentation, which uses the terms "worker thread" (worker thread) or simply "worker" (worker). In the future, we will follow the same approach.

About tasks that can be solved using workers in Node.js

Worker threads are intended, as already mentioned, to solve tasks that intensively use the capabilities of the processor. It should be noted that using them to solve I/O tasks is a waste of resources, since, in accordance with the official documentation, the internal mechanisms of Node.js aimed at organizing asynchronous I/O are much more efficient in themselves than using them for solving the same task of worker threads. Therefore, we immediately decide that we will not deal with data input-output using workers.

Let's start with a simple example that demonstrates how to create and use workers.

Example #1

const ( Worker, isMainThread, workerData ) = require("worker_threads"); let currentVal = 0; let intervals = function counter(id, i)( console.log("[", id, "]", i) return i; ) if(isMainThread) ( console.log("this is the main thread") for( let i = 0; i< 2; i++) { let w = new Worker(__filename, {workerData: i}); } setInterval((a) =>currentVal = counter(a,currentVal + 1), intervals, "MainThread"); ) else ( console.log("this isn't") setInterval((a) => currentVal = counter(a,currentVal + 1), intervals, workerData); )
The output of this code will look like a set of lines showing counters that increase at different rates.


Results of the first example

Let's figure out what's going on here:

  1. The instructions inside the if statement create 2 threads, the code for which, thanks to the __filename parameter, is taken from the same script that was passed to Node.js when the example was run. Workers currently need the full path to the code file, they do not support relative paths, which is why this value is used here.
  2. Data is sent to these two workers as a global parameter, in the form of the workerData attribute, which is used in the second argument. This value can then be accessed via a constant of the same name (note how the corresponding constant is created in the first line of the file, and how, in the last line, it is used).
Here is a very simple example of using the worker_threads module, nothing interesting happens here yet. So let's look at another example.

Example #2

Let's consider an example in which, firstly, we will perform some "heavy" calculations, and secondly, we will do something asynchronous in the main thread.

Const ( Worker, isMainThread, parentPort, workerData ) = require("worker_threads"); const request = require("request"); if(isMainThread) ( console.log("This is the main thread") let w = new Worker(__filename, (workerData: null)); w.on("message", (msg) => ( //Message from console.log("First value is: ", msg.val); console.log("Took: ", (msg.timeDiff / 1000), " seconds"); )) w.on("error", console.error); w.on("exit", (code) => ( if(code != 0) console.error(new Error(`Worker stopped with exit code $(code)`)) )); request .get("http://www.google.com", (err, resp) => ( if(err) ( return console.error(err); ) console.log("Total bytes received: ", resp. body.length); )) ) else ( //worker code function random(min, max) ( return Math.random() * (max - min) + min ) const sorter = require("./list-sorter") ;const start = Date.now() let bigList = Array(1000000).fill().map((_) => random(1,10000)) sorter.sort(bigList);parentPort.postMessage(( val: sorter .firstValue, timeDiff: Date.now() - start)); )
In order to run this example on your own, please note that this code needs the request module (it can be installed using npm, for example, using, in an empty directory with a file containing the above code, the commands npm init --yes and npm install request --save), and the fact that it uses a helper module connected with the command const sorter = require("./list-sorter"); . The file of this module (list-sorter.js) should be located in the same place as the above file, its code looks like this:

Module.exports = ( firstValue: null, sort: function(list) ( let sorted = list.sort(); this.firstValue = sorted ) )
This time we are solving two problems in parallel. Firstly, we load the google.com home page, and secondly, we sort a randomly generated array of a million numbers. This can take a few seconds, which gives us a great opportunity to see the new Node.js mechanisms in action. In addition, here we measure the time it takes the worker thread to sort the numbers, after which we send the result of the measurement (along with the first element of the sorted array) to the main thread, which prints the results to the console.


The result of the second example

In this example, the most important thing is to demonstrate the mechanism for exchanging data between threads.
Workers can receive messages from the main thread thanks to the on method. In the code, you can find the events that we listen to. The message event is fired every time we send a message from some thread using the parentPort.postMessage method. Alternatively, the same method can be used to send messages to a thread by accessing the worker instance and receive them using the parentPort object.

Now let's look at another example, very similar to what we have already seen, but this time we will pay special attention to the structure of the project.

Example #3

As a last example, we propose to consider the implementation of the same functionality as in the previous example, but this time we will improve the structure of the code, make it cleaner, bring it to a form that improves the maintainability of the software project.

Here is the main program code.

Const ( Worker, isMainThread, parentPort, workerData ) = require("worker_threads"); const request = require("request"); function startWorker(path, cb) ( let w = new Worker(path, (workerData: null)); w.on("message", (msg) => ( cb(null, msg) )) w.on(" error", cb); w.on("exit", (code) => ( if(code != 0) console.error(new Error(`Worker stopped with exit code $(code)`)) )); return w; ) console.log("this is the main thread") let myWorker = startWorker(__dirname + "/workerCode.js", (err, result) => ( if(err) return console.error(err); console.log("[]") console.log("First value is: ", result.val); console.log("Took: ", (result.timeDiff / 1000), " seconds"); )) const start = date.now(); request.get("http://www.google.com", (err, resp) => ( if(err) ( return console.error(err); ) console.log("Total bytes received: ", resp .body.length); //myWorker.postMessage((finished: true, timeDiff: Date.now() - start)) //this way you can send messages to the worker ))
And here is the code describing the behavior of the worker thread (in the above program, the path to the file with this code is formed using the __dirname + "/workerCode.js" construct):

Const ( parentPort ) = require("worker_threads"); function random(min, max) ( return Math.random() * (max - min) + min ) const sorter = require("./list-sorter"); const start = Date.now() let bigList = Array(1000000).fill().map((_) => random(1,10000)) /** //here's how to get the message from the main thread: parentPort.on ("message", (msg) => ( console.log("Main thread finished on: ", (msg.timeDiff / 1000), " seconds..."); )) */ sorter.sort(bigList); parentPort.postMessage(( val: sorter.firstValue, timeDiff: Date.now() - start));
Here are the features of this example:

  1. Now the code for the main thread and the worker thread are placed in different files. This makes it easier to maintain and expand the project.
  2. The startWorker function returns a new instance of the worker, which allows, if necessary, to send messages to this worker from the main thread.
  3. There is no need to check if the code is running on the main thread here (we removed the if statement with the corresponding check).
  4. The worker shows a commented code fragment that demonstrates the mechanism for receiving messages from the main thread, which, given the mechanism for sending messages already considered, allows you to organize two-way asynchronous data exchange between the main thread and the worker thread.

Results

In this material, we, using practical examples, examined the features of using the new features for working with streams in Node.js. If you have mastered what was discussed here, it means that you are ready to start your own experiments with the worker_threads module after looking at the documentation. Perhaps it is also worth noting that this feature has only appeared in Node.js while it is experimental, so over time, something in its implementation may change. Also, if you encounter bugs in your own experiments with worker_threads, or find that this module is not hindered by some missing feature, please let the developers know and help improve the Node.js platform.

Dear readers! What do you think of Node.js multithreading support? Do you plan to use this feature in your projects?