Introduction to pysheds#

In this tutorial, we’re going to cover a new Python package one can use to determine the catchment area and resource of a potential run-of-river power plant:

pysheds: Simple and fast watershed delineation in python

Pysheds is an open-source library designed to help with processing of digital elevation models (DEMs), particularly for hydrologic analysis. Pysheds performs many of the basic hydrologic functions, including catchment delineation and accumulation computation.

Note

Documentation for pysheds is available at http://mattbartos.com/pysheds/.

Note

If you have not yet set up Python on your computer, you can execute this tutorial in your browser via Google Colab. Click on the rocket in the top right corner and launch “Colab”. If that doesn’t work download the .ipynb file and import it in Google Colab.

Then install the following packages by executing the following command in a Jupyter cell at the top of the notebook.

!pip install pysheds matplotlib

Note

The following tutorial is based on the following resources:

Data Downloads#

Before we can run this tutorial, we have to download a few input data files to your local filesystem. The cell below uses some Python utility packages to automate the retrieval of the required files.

from urllib.request import urlretrieve
from os.path import basename

url = "https://tubcloud.tu-berlin.de/s/ZPiZjX3Nzc6R6RX/download/output_COP30.tif"
urlretrieve(url, basename(url))
('output_COP30.tif', <http.client.HTTPMessage at 0x7f18a002d6d0>)

Imports#

First, let’s bring in the Grid class from the pysheds library, which will be our primary tool for managing and analyzing grid-based spatial data like elevation models in our hydrological computations…

from pysheds.grid import Grid
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[2], line 1
----> 1 from pysheds.grid import Grid

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/pysheds/grid.py:7
      5     _HAS_NUMBA = False
      6 if _HAS_NUMBA:
----> 7     from pysheds.sgrid import sGrid as Grid  # noqa: F401
      8 else:
      9     from pysheds.pgrid import Grid as Grid  # noqa: F401

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/pysheds/sgrid.py:30
     27 from pysheds.sview import View, ViewFinder
     29 # Import numba functions
---> 30 import pysheds._sgrid as _self
     31 from . import projection
     33 class sGrid():

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/pysheds/_sgrid.py:198
    195                 angle[i, j] = flow_angle
    196     return angle
--> 198 @njit(Tuple((int64[:,:], int64[:,:], float64[:,:], float64[:,:]))
    199       (float64[:,:], UniTuple(int64, 8), boolean[:,:]),
    200       parallel=True,
    201       cache=True)
    202 def _angle_to_d8_numba(angles, dirmap, nodata_cells):
    203     n = angles.size
    204     min_angle = 0.

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/decorators.py:236, in _jit.<locals>.wrapper(func)
    234     with typeinfer.register_dispatcher(disp):
    235         for sig in sigs:
--> 236             disp.compile(sig)
    237         disp.disable_compile()
    238 return disp

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:957, in Dispatcher.compile(self, sig)
    955 with ev.trigger_event("numba:compile", data=ev_details):
    956     try:
--> 957         cres = self._compiler.compile(args, return_type)
    958     except errors.ForceLiteralArg as e:
    959         def folded(args, kws):

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:125, in _FunctionCompiler.compile(self, args, return_type)
    124 def compile(self, args, return_type):
--> 125     status, retval = self._compile_cached(args, return_type)
    126     if status:
    127         return retval

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:139, in _FunctionCompiler._compile_cached(self, args, return_type)
    136     pass
    138 try:
--> 139     retval = self._compile_core(args, return_type)
    140 except errors.TypingError as e:
    141     self._failed_cache[key] = e

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:152, in _FunctionCompiler._compile_core(self, args, return_type)
    149 flags = self._customize_flags(flags)
    151 impl = self._get_implementation(args, {})
--> 152 cres = compiler.compile_extra(self.targetdescr.typing_context,
    153                               self.targetdescr.target_context,
    154                               impl,
    155                               args=args, return_type=return_type,
    156                               flags=flags, locals=self.locals,
    157                               pipeline_class=self.pipeline_class)
    158 # Check typing error if object mode is used
    159 if cres.typing_error is not None and not flags.enable_pyobject:

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:751, in compile_extra(typingctx, targetctx, func, args, return_type, flags, locals, library, pipeline_class)
    727 """Compiler entry point
    728 
    729 Parameter
   (...)
    747     compiler pipeline
    748 """
    749 pipeline = pipeline_class(typingctx, targetctx, library,
    750                           args, return_type, flags, locals)
--> 751 return pipeline.compile_extra(func)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:445, in CompilerBase.compile_extra(self, func)
    443 self.state.lifted = ()
    444 self.state.lifted_from = None
--> 445 return self._compile_bytecode()

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:513, in CompilerBase._compile_bytecode(self)
    509 """
    510 Populate and run pipeline for bytecode input
    511 """
    512 assert self.state.func_ir is None
--> 513 return self._compile_core()

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:479, in CompilerBase._compile_core(self)
    477 res = None
    478 try:
--> 479     pm.run(self.state)
    480     if self.state.cr is not None:
    481         break

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_machinery.py:356, in PassManager.run(self, state)
    354 pass_inst = _pass_registry.get(pss).pass_inst
    355 if isinstance(pass_inst, CompilerPass):
--> 356     self._runPass(idx, pass_inst, state)
    357 else:
    358     raise BaseException("Legacy pass in use")

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
     32 @functools.wraps(func)
     33 def _acquire_compile_lock(*args, **kwargs):
     34     with self:
---> 35         return func(*args, **kwargs)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_machinery.py:311, in PassManager._runPass(self, index, pss, internal_state)
    309     mutated |= check(pss.run_initialization, internal_state)
    310 with SimpleTimer() as pass_time:
--> 311     mutated |= check(pss.run_pass, internal_state)
    312 with SimpleTimer() as finalize_time:
    313     mutated |= check(pss.run_finalizer, internal_state)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_machinery.py:273, in PassManager._runPass.<locals>.check(func, compiler_state)
    272 def check(func, compiler_state):
--> 273     mangled = func(compiler_state)
    274     if mangled not in (True, False):
    275         msg = ("CompilerPass implementations should return True/False. "
    276                "CompilerPass with name '%s' did not.")

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/typed_passes.py:468, in BaseNativeLowering.run_pass(self, state)
    465 with targetctx.push_code_library(library):
    466     lower = self.lowering_class(targetctx, library, fndesc, interp,
    467                                 metadata=metadata)
--> 468     lower.lower()
    469     if not flags.no_cpython_wrapper:
    470         lower.create_cpython_wrapper(flags.release_gil)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:187, in BaseLower.lower(self)
    185 if self.generator_info is None:
    186     self.genlower = None
--> 187     self.lower_normal_function(self.fndesc)
    188 else:
    189     self.genlower = self.GeneratorLower(self)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:226, in BaseLower.lower_normal_function(self, fndesc)
    224 # Init argument values
    225 self.extract_function_arguments()
--> 226 entry_block_tail = self.lower_function_body()
    228 # Close tail of entry block, do not emit debug metadata else the
    229 # unconditional jump gets associated with the metadata from the function
    230 # body end.
    231 with debuginfo.suspend_emission(self.builder):

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:256, in BaseLower.lower_function_body(self)
    254     self.builder.position_at_end(bb)
    255     self.debug_print(f"# lower block: {offset}")
--> 256     self.lower_block(block)
    257 self.post_lower()
    258 return entry_block_tail

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:270, in BaseLower.lower_block(self, block)
    267     defaulterrcls = partial(LoweringError, loc=self.loc)
    268     with new_error_context('lowering "{inst}" at {loc}', inst=inst,
    269                            loc=self.loc, errcls_=defaulterrcls):
--> 270         self.lower_inst(inst)
    271 self.post_block(block)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:51, in ParforLower.lower_inst(self, inst)
     49 def lower_inst(self, inst):
     50     if isinstance(inst, parfor.Parfor):
---> 51         _lower_parfor_parallel(self, inst)
     52     else:
     53         super().lower_inst(inst)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:67, in _lower_parfor_parallel(lowerer, parfor)
     65 def _lower_parfor_parallel(lowerer, parfor):
     66     if parfor.lowerer is None:
---> 67         return _lower_parfor_parallel_std(lowerer, parfor)
     68     else:
     69         return parfor.lowerer(lowerer, parfor)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:114, in _lower_parfor_parallel_std(lowerer, parfor)
    112     if config.DEBUG_ARRAY_OPT:
    113         print("lower init_block instr = ", instr)
--> 114     lowerer.lower_inst(instr)
    116 for racevar in parfor.races:
    117     if racevar not in varmap:

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:53, in ParforLower.lower_inst(self, inst)
     51     _lower_parfor_parallel(self, inst)
     52 else:
---> 53     super().lower_inst(inst)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:456, in Lower.lower_inst(self, inst)
    454         self.debuginfo.mark_location(self.builder, self.defn_loc.line)
    455         argidx = inst.value.index + 1 # args start at 1
--> 456     self.storevar(val, inst.target.name, argidx=argidx)
    458 elif isinstance(inst, ir.Branch):
    459     cond = self.loadvar(inst.cond.name)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:1466, in Lower.storevar(self, value, name, argidx)
   1464 fetype = self.typeof(name)
   1465 # Define if not already
-> 1466 self._alloca_var(name, fetype)
   1468 # Store variable
   1469 if (name in self._singly_assigned_vars and
   1470         not self._disable_sroa_like_opt):

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:1424, in Lower._alloca_var(self, name, fetype)
   1420 # If the name is used in multiple blocks or lowering with debuginfo...
   1421 if ((name not in self._singly_assigned_vars) or
   1422         self._disable_sroa_like_opt):
   1423     # If not already defined, allocate it
-> 1424     ptr = self.alloca(name, fetype)
   1425     # Remember the pointer
   1426     self.varmap[name] = ptr

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:1533, in Lower.alloca(self, name, type)
   1531 def alloca(self, name, type):
   1532     lltype = self.context.get_value_type(type)
-> 1533     datamodel = self.context.data_model_manager[type]
   1534     return self.alloca_lltype(name, lltype, datamodel=datamodel)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/datamodel/manager.py:43, in DataModelManager.__getitem__(self, fetype)
     40 def __getitem__(self, fetype):
     41     """Shorthand for lookup()
     42     """
---> 43     return self.lookup(fetype)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/datamodel/manager.py:33, in DataModelManager.lookup(self, fetype)
     30 """Returns the corresponding datamodel given the frontend-type instance
     31 """
     32 try:
---> 33     return self._cache[fetype]
     34 except KeyError:
     35     pass

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/weakref.py:415, in WeakKeyDictionary.__getitem__(self, key)
    414 def __getitem__(self, key):
--> 415     return self.data[ref(key)]

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/types/abstract.py:122, in Type.__hash__(self)
    121 def __hash__(self):
--> 122     return hash(self.key)

File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/types/abstract.py:121, in Type.__hash__(self)
    118 def __str__(self):
    119     return self.name
--> 121 def __hash__(self):
    122     return hash(self.key)
    124 def __eq__(self, other):

KeyboardInterrupt: 

… alongside a few additional Python packages you already know from previous tutorials.

import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import numpy as np
import cartopy.crs as ccrs
import cartopy.feature as cfeature

Read Digital Elevation Model (DEM)#

A Digital Elevation Model (DEM) is a 3D representation of a terrain’s surface created from terrain elevation data, and it’s the main dataset we’ll read into our analysis to model the watershed’s topography.

For this example, data for a small part of Northern Portugal is taken from the Copernicus Global Digital Elevation Model, which offers global coverage at a resolution of 30 metres.

European Space Agency, Sinergise (2021). Copernicus Global Digital Elevation Model. Distributed by OpenTopography. https://doi.org/10.5069/G9028PQB.

fn = "output_COP30.tif"
grid = Grid.from_raster(fn)
dem = grid.read_raster(fn)
fig = plt.figure()
ax = plt.axes(projection=ccrs.PlateCarree())
plt.imshow(dem, extent=grid.extent, cmap="terrain")
plt.colorbar(label="Elevation (m)", orientation="horizontal")
ax.coastlines()
ax.add_feature(cfeature.BORDERS)

Conditioning the elevation data#

Raw DEMs often contain artifacts (such as pits, depressions and flat regions) that prevent the DEM from fully draining.

  • Pits and Depressions consist of cells or larger regions of cells for which every surrounding cell is at a higher elevation.

  • Flats consist of cells at which every surrounding cell is at the same elevation or higher.

The presence of depressions and flats means that water will aggregate there and will not move downwards which would be a problem.

These artifacts need to be removed before the datasets can be used. By applying the following code, the affected pixels will get the values of their surrounding ones.

pit_filled_dem = grid.fill_pits(dem)
flooded_dem = grid.fill_depressions(pit_filled_dem)
conditioned_dem = grid.resolve_flats(flooded_dem)

After conditioning the data by filling depressions and resolving flats, the flow direction can be determined.

Computing flow directions#

Flow directions are computed to capture the topology of the drainage network, and are needed for delineating catchment areas and computing the accumulation of flow. By default, pysheds will compute flow directions using the D8 routing scheme (ESRI). In this routing mode, each cell is routed to one of eight neighboring cells based on the direction of steepest descent:

  • North: 64

  • Northeast: 128

  • East: 1

  • Southeast: 2

  • South: 4

  • Southwest: 8

  • West: 16

  • Northwest: 32

flowdir = grid.flowdir(conditioned_dem)
np.unique(flowdir)
plt.imshow(flowdir, extent=grid.extent, cmap="cividis")

Computing flow accumulation#

The accumulated flow is the number of upstream cells that contribute flow to each cell in the DEM grid. The result of this calculation is a grid where each cell’s value represents the count of all cells that flow into it, directly or indirectly. In essence, it gives you an idea of the potential water volume that would pass through each point if each cell contributed a unit amount of water. The flow accumulation process takes the flow direction grid as input and works by tracking the flow from each cell to its downslope neighbors. It aggregates the number of cells flowing into each downslope cell, essentially “accumulating” the flow as it moves through the drainage network. Hence, as you move downstream, the number increases, and the other way around for upstream.

acc = grid.accumulation(flowdir)
fig, ax = plt.subplots()
im = ax.imshow(
    acc,
    extent=grid.extent,
    cmap="cubehelix",
    norm=LogNorm(1, acc.max()),
    interpolation="bilinear",
)
plt.colorbar(im, ax=ax, label="Upstream Cells", orientation="horizontal")

Computing catchment areas#

The method to calculate the catchment area also operates on the flow direction grid.

To delineate a catchment, first specify a pour point (the outlet of the catchment area, e.g. the weir of a run-of-river power plant).

However, sometimes the pour point is not known exactly. In this case, it can be helpful to first compute the accumulated flow and then snap a roughly estimated pour point to the nearest high accumulation cell.

For instance, we may specify a trial pour point…

x, y = -7.954, 41.47

… then snap the pour point to a high accumulation cell with at least 5000 upstream cells:

x_snap, y_snap = grid.snap_to_mask(acc > 5000, (x, y))

This point, we can then use to delineate the catchment area:

catch = grid.catchment(x=x_snap, y=y_snap, fdir=flowdir, xytype="coordinate")

Note

  • If the x and y components of the pour point are spatial coordinates in the grid’s spatial reference system, specify xytype='coordinate'.

  • If the x and y components of the pour point correspond to the row and column indices of the flow direction array, specify xytype='index'.

We can then clip our grid to the catchment area and plot the result:

grid.clip_to(catch)
clipped_catch = grid.view(catch)
clipped_accumulated_flow = grid.accumulation(flowdir)
fig, ax = plt.subplots()
im = ax.imshow(
    np.where(clipped_catch, clipped_accumulated_flow, np.nan),
    extent=grid.extent,
    cmap="cubehelix",
    norm=LogNorm(1, clipped_accumulated_flow.max()),
    interpolation="bilinear",
)
plt.colorbar(
    im,
    ax=ax,
    label="Upstream Cells",
)
plt.scatter([x_snap], [y_snap], c="r", s=50)

Where to go from here?#

Explore datasets of global hydro basins and what they are used for:

https://www.hydrosheds.org/

The HydroSHEDS database offers a suite of global digital data layers in support of hydro-ecological research and applications worldwide. Its various hydrographic data products include catchment boundaries, river networks, and lakes at multiple resolutions and scales. HydroSHEDS data are freely available in standard GIS formats and form the geospatial framework for a broad range of assessments including hydrological, environmental, conservation, socioeconomic, and human health applications.

https://www.hydrosheds.org/applications/ssp-hydropower

HydroSHEDS data have been used to assess configuration scenarios for future hydropower development at global, national (e.g., Zambia, Nepal) or basin-level (e.g., Marañón river) scales. This ‘system-scale planning’ (SSP) approach provides a large-scale perspective which considers the complex interactions and cumulative effects of different dam combinations rather than determining the impacts of each dam project individually. This type of holistic assessment takes the regional and downstream effects of the entire hydropower scenario into account and allows for strategic planning by comparing various national or international development options. It is particularly useful during the early phases of hydropower portfolio design and can facilitate the dialog between stakeholders and policymakers as a decision support tool.

Calculate water flow volume at pour point using meteorological data:

With the delineated catchment area from pysheds, you can utilize surface runoff data, which represents the part of precipitation and snowmelt that flows over the land without infiltrating the soil or evaporating, to estimate total flow at the basin’s pour point. By integrating surface runoff data over the entire catchment area, typically given in volume per area per time (e.g. millimetres per day), and summing it up for all the cells within the catchment, you can approximate the total water volume contributing to the flow at the pour point. This integration accounts for spatial variations in runoff generation across the catchment, providing you with an aggregated flow estimate that can be used to assess the hydrological output of the catchment at the specific location where the water exits the basin, for instance, the amount of water available for electricity generation in a run-of-river plant.

Exercises#

Determine a few additional catchment areas by varying the pour point and the threshold for the accumulated flow.