Introduction to pysheds
#
In this tutorial, we’re going to cover a new Python package one can use to determine the catchment area and resource of a potential run-of-river power plant:
pysheds: Simple and fast watershed delineation in python
Pysheds is an open-source library designed to help with processing of digital elevation models (DEMs), particularly for hydrologic analysis. Pysheds performs many of the basic hydrologic functions, including catchment delineation and accumulation computation.
Note
Documentation for pysheds
is available at http://mattbartos.com/pysheds/.
Note
If you have not yet set up Python on your computer, you can execute this tutorial in your browser via Google Colab. Click on the rocket in the top right corner and launch “Colab”. If that doesn’t work download the .ipynb
file and import it in Google Colab.
Then install the following packages by executing the following command in a Jupyter cell at the top of the notebook.
!pip install pysheds matplotlib
Note
The following tutorial is based on the following resources:
Data Downloads#
Before we can run this tutorial, we have to download a few input data files to your local filesystem. The cell below uses some Python utility packages to automate the retrieval of the required files.
from urllib.request import urlretrieve
from os.path import basename
url = "https://tubcloud.tu-berlin.de/s/ZPiZjX3Nzc6R6RX/download/output_COP30.tif"
urlretrieve(url, basename(url))
('output_COP30.tif', <http.client.HTTPMessage at 0x7f18a002d6d0>)
Imports#
First, let’s bring in the Grid
class from the pysheds
library, which will be our primary tool for managing and analyzing grid-based spatial data like elevation models in our hydrological computations…
from pysheds.grid import Grid
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
Cell In[2], line 1
----> 1 from pysheds.grid import Grid
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/pysheds/grid.py:7
5 _HAS_NUMBA = False
6 if _HAS_NUMBA:
----> 7 from pysheds.sgrid import sGrid as Grid # noqa: F401
8 else:
9 from pysheds.pgrid import Grid as Grid # noqa: F401
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/pysheds/sgrid.py:30
27 from pysheds.sview import View, ViewFinder
29 # Import numba functions
---> 30 import pysheds._sgrid as _self
31 from . import projection
33 class sGrid():
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/pysheds/_sgrid.py:198
195 angle[i, j] = flow_angle
196 return angle
--> 198 @njit(Tuple((int64[:,:], int64[:,:], float64[:,:], float64[:,:]))
199 (float64[:,:], UniTuple(int64, 8), boolean[:,:]),
200 parallel=True,
201 cache=True)
202 def _angle_to_d8_numba(angles, dirmap, nodata_cells):
203 n = angles.size
204 min_angle = 0.
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/decorators.py:236, in _jit.<locals>.wrapper(func)
234 with typeinfer.register_dispatcher(disp):
235 for sig in sigs:
--> 236 disp.compile(sig)
237 disp.disable_compile()
238 return disp
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:957, in Dispatcher.compile(self, sig)
955 with ev.trigger_event("numba:compile", data=ev_details):
956 try:
--> 957 cres = self._compiler.compile(args, return_type)
958 except errors.ForceLiteralArg as e:
959 def folded(args, kws):
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:125, in _FunctionCompiler.compile(self, args, return_type)
124 def compile(self, args, return_type):
--> 125 status, retval = self._compile_cached(args, return_type)
126 if status:
127 return retval
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:139, in _FunctionCompiler._compile_cached(self, args, return_type)
136 pass
138 try:
--> 139 retval = self._compile_core(args, return_type)
140 except errors.TypingError as e:
141 self._failed_cache[key] = e
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/dispatcher.py:152, in _FunctionCompiler._compile_core(self, args, return_type)
149 flags = self._customize_flags(flags)
151 impl = self._get_implementation(args, {})
--> 152 cres = compiler.compile_extra(self.targetdescr.typing_context,
153 self.targetdescr.target_context,
154 impl,
155 args=args, return_type=return_type,
156 flags=flags, locals=self.locals,
157 pipeline_class=self.pipeline_class)
158 # Check typing error if object mode is used
159 if cres.typing_error is not None and not flags.enable_pyobject:
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:751, in compile_extra(typingctx, targetctx, func, args, return_type, flags, locals, library, pipeline_class)
727 """Compiler entry point
728
729 Parameter
(...)
747 compiler pipeline
748 """
749 pipeline = pipeline_class(typingctx, targetctx, library,
750 args, return_type, flags, locals)
--> 751 return pipeline.compile_extra(func)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:445, in CompilerBase.compile_extra(self, func)
443 self.state.lifted = ()
444 self.state.lifted_from = None
--> 445 return self._compile_bytecode()
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:513, in CompilerBase._compile_bytecode(self)
509 """
510 Populate and run pipeline for bytecode input
511 """
512 assert self.state.func_ir is None
--> 513 return self._compile_core()
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler.py:479, in CompilerBase._compile_core(self)
477 res = None
478 try:
--> 479 pm.run(self.state)
480 if self.state.cr is not None:
481 break
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_machinery.py:356, in PassManager.run(self, state)
354 pass_inst = _pass_registry.get(pss).pass_inst
355 if isinstance(pass_inst, CompilerPass):
--> 356 self._runPass(idx, pass_inst, state)
357 else:
358 raise BaseException("Legacy pass in use")
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_lock.py:35, in _CompilerLock.__call__.<locals>._acquire_compile_lock(*args, **kwargs)
32 @functools.wraps(func)
33 def _acquire_compile_lock(*args, **kwargs):
34 with self:
---> 35 return func(*args, **kwargs)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_machinery.py:311, in PassManager._runPass(self, index, pss, internal_state)
309 mutated |= check(pss.run_initialization, internal_state)
310 with SimpleTimer() as pass_time:
--> 311 mutated |= check(pss.run_pass, internal_state)
312 with SimpleTimer() as finalize_time:
313 mutated |= check(pss.run_finalizer, internal_state)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/compiler_machinery.py:273, in PassManager._runPass.<locals>.check(func, compiler_state)
272 def check(func, compiler_state):
--> 273 mangled = func(compiler_state)
274 if mangled not in (True, False):
275 msg = ("CompilerPass implementations should return True/False. "
276 "CompilerPass with name '%s' did not.")
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/typed_passes.py:468, in BaseNativeLowering.run_pass(self, state)
465 with targetctx.push_code_library(library):
466 lower = self.lowering_class(targetctx, library, fndesc, interp,
467 metadata=metadata)
--> 468 lower.lower()
469 if not flags.no_cpython_wrapper:
470 lower.create_cpython_wrapper(flags.release_gil)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:187, in BaseLower.lower(self)
185 if self.generator_info is None:
186 self.genlower = None
--> 187 self.lower_normal_function(self.fndesc)
188 else:
189 self.genlower = self.GeneratorLower(self)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:226, in BaseLower.lower_normal_function(self, fndesc)
224 # Init argument values
225 self.extract_function_arguments()
--> 226 entry_block_tail = self.lower_function_body()
228 # Close tail of entry block, do not emit debug metadata else the
229 # unconditional jump gets associated with the metadata from the function
230 # body end.
231 with debuginfo.suspend_emission(self.builder):
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:256, in BaseLower.lower_function_body(self)
254 self.builder.position_at_end(bb)
255 self.debug_print(f"# lower block: {offset}")
--> 256 self.lower_block(block)
257 self.post_lower()
258 return entry_block_tail
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:270, in BaseLower.lower_block(self, block)
267 defaulterrcls = partial(LoweringError, loc=self.loc)
268 with new_error_context('lowering "{inst}" at {loc}', inst=inst,
269 loc=self.loc, errcls_=defaulterrcls):
--> 270 self.lower_inst(inst)
271 self.post_block(block)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:51, in ParforLower.lower_inst(self, inst)
49 def lower_inst(self, inst):
50 if isinstance(inst, parfor.Parfor):
---> 51 _lower_parfor_parallel(self, inst)
52 else:
53 super().lower_inst(inst)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:67, in _lower_parfor_parallel(lowerer, parfor)
65 def _lower_parfor_parallel(lowerer, parfor):
66 if parfor.lowerer is None:
---> 67 return _lower_parfor_parallel_std(lowerer, parfor)
68 else:
69 return parfor.lowerer(lowerer, parfor)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:114, in _lower_parfor_parallel_std(lowerer, parfor)
112 if config.DEBUG_ARRAY_OPT:
113 print("lower init_block instr = ", instr)
--> 114 lowerer.lower_inst(instr)
116 for racevar in parfor.races:
117 if racevar not in varmap:
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/parfors/parfor_lowering.py:53, in ParforLower.lower_inst(self, inst)
51 _lower_parfor_parallel(self, inst)
52 else:
---> 53 super().lower_inst(inst)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:456, in Lower.lower_inst(self, inst)
454 self.debuginfo.mark_location(self.builder, self.defn_loc.line)
455 argidx = inst.value.index + 1 # args start at 1
--> 456 self.storevar(val, inst.target.name, argidx=argidx)
458 elif isinstance(inst, ir.Branch):
459 cond = self.loadvar(inst.cond.name)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:1466, in Lower.storevar(self, value, name, argidx)
1464 fetype = self.typeof(name)
1465 # Define if not already
-> 1466 self._alloca_var(name, fetype)
1468 # Store variable
1469 if (name in self._singly_assigned_vars and
1470 not self._disable_sroa_like_opt):
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:1424, in Lower._alloca_var(self, name, fetype)
1420 # If the name is used in multiple blocks or lowering with debuginfo...
1421 if ((name not in self._singly_assigned_vars) or
1422 self._disable_sroa_like_opt):
1423 # If not already defined, allocate it
-> 1424 ptr = self.alloca(name, fetype)
1425 # Remember the pointer
1426 self.varmap[name] = ptr
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/lowering.py:1533, in Lower.alloca(self, name, type)
1531 def alloca(self, name, type):
1532 lltype = self.context.get_value_type(type)
-> 1533 datamodel = self.context.data_model_manager[type]
1534 return self.alloca_lltype(name, lltype, datamodel=datamodel)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/datamodel/manager.py:43, in DataModelManager.__getitem__(self, fetype)
40 def __getitem__(self, fetype):
41 """Shorthand for lookup()
42 """
---> 43 return self.lookup(fetype)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/datamodel/manager.py:33, in DataModelManager.lookup(self, fetype)
30 """Returns the corresponding datamodel given the frontend-type instance
31 """
32 try:
---> 33 return self._cache[fetype]
34 except KeyError:
35 pass
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/weakref.py:415, in WeakKeyDictionary.__getitem__(self, key)
414 def __getitem__(self, key):
--> 415 return self.data[ref(key)]
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/types/abstract.py:122, in Type.__hash__(self)
121 def __hash__(self):
--> 122 return hash(self.key)
File /opt/hostedtoolcache/Python/3.11.9/x64/lib/python3.11/site-packages/numba/core/types/abstract.py:121, in Type.__hash__(self)
118 def __str__(self):
119 return self.name
--> 121 def __hash__(self):
122 return hash(self.key)
124 def __eq__(self, other):
KeyboardInterrupt:
… alongside a few additional Python packages you already know from previous tutorials.
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import numpy as np
import cartopy.crs as ccrs
import cartopy.feature as cfeature
Read Digital Elevation Model (DEM)#
A Digital Elevation Model (DEM) is a 3D representation of a terrain’s surface created from terrain elevation data, and it’s the main dataset we’ll read into our analysis to model the watershed’s topography.
For this example, data for a small part of Northern Portugal is taken from the Copernicus Global Digital Elevation Model, which offers global coverage at a resolution of 30 metres.
European Space Agency, Sinergise (2021). Copernicus Global Digital Elevation Model. Distributed by OpenTopography. https://doi.org/10.5069/G9028PQB.
fn = "output_COP30.tif"
grid = Grid.from_raster(fn)
dem = grid.read_raster(fn)
fig = plt.figure()
ax = plt.axes(projection=ccrs.PlateCarree())
plt.imshow(dem, extent=grid.extent, cmap="terrain")
plt.colorbar(label="Elevation (m)", orientation="horizontal")
ax.coastlines()
ax.add_feature(cfeature.BORDERS)
Conditioning the elevation data#
Raw DEMs often contain artifacts (such as pits, depressions and flat regions) that prevent the DEM from fully draining.
Pits and Depressions consist of cells or larger regions of cells for which every surrounding cell is at a higher elevation.
Flats consist of cells at which every surrounding cell is at the same elevation or higher.
The presence of depressions and flats means that water will aggregate there and will not move downwards which would be a problem.
These artifacts need to be removed before the datasets can be used. By applying the following code, the affected pixels will get the values of their surrounding ones.
pit_filled_dem = grid.fill_pits(dem)
flooded_dem = grid.fill_depressions(pit_filled_dem)
conditioned_dem = grid.resolve_flats(flooded_dem)
After conditioning the data by filling depressions and resolving flats, the flow direction can be determined.
Computing flow directions#
Flow directions are computed to capture the topology of the drainage network,
and are needed for delineating catchment areas and computing the accumulation of
flow. By default, pysheds
will compute flow directions using the D8 routing
scheme (ESRI). In this routing mode, each cell is routed to one of eight
neighboring cells based on the direction of steepest descent:
North: 64
Northeast: 128
East: 1
Southeast: 2
South: 4
Southwest: 8
West: 16
Northwest: 32
flowdir = grid.flowdir(conditioned_dem)
np.unique(flowdir)
plt.imshow(flowdir, extent=grid.extent, cmap="cividis")
Computing flow accumulation#
The accumulated flow is the number of upstream cells that contribute flow to each cell in the DEM grid. The result of this calculation is a grid where each cell’s value represents the count of all cells that flow into it, directly or indirectly. In essence, it gives you an idea of the potential water volume that would pass through each point if each cell contributed a unit amount of water. The flow accumulation process takes the flow direction grid as input and works by tracking the flow from each cell to its downslope neighbors. It aggregates the number of cells flowing into each downslope cell, essentially “accumulating” the flow as it moves through the drainage network. Hence, as you move downstream, the number increases, and the other way around for upstream.
acc = grid.accumulation(flowdir)
fig, ax = plt.subplots()
im = ax.imshow(
acc,
extent=grid.extent,
cmap="cubehelix",
norm=LogNorm(1, acc.max()),
interpolation="bilinear",
)
plt.colorbar(im, ax=ax, label="Upstream Cells", orientation="horizontal")
Computing catchment areas#
The method to calculate the catchment area also operates on the flow direction grid.
To delineate a catchment, first specify a pour point (the outlet of the catchment area, e.g. the weir of a run-of-river power plant).
However, sometimes the pour point is not known exactly. In this case, it can be helpful to first compute the accumulated flow and then snap a roughly estimated pour point to the nearest high accumulation cell.
For instance, we may specify a trial pour point…
x, y = -7.954, 41.47
… then snap the pour point to a high accumulation cell with at least 5000 upstream cells:
x_snap, y_snap = grid.snap_to_mask(acc > 5000, (x, y))
This point, we can then use to delineate the catchment area:
catch = grid.catchment(x=x_snap, y=y_snap, fdir=flowdir, xytype="coordinate")
Note
If the x and y components of the pour point are spatial coordinates in the grid’s spatial reference system, specify
xytype='coordinate'
.If the x and y components of the pour point correspond to the row and column indices of the flow direction array, specify
xytype='index'
.
We can then clip our grid to the catchment area and plot the result:
grid.clip_to(catch)
clipped_catch = grid.view(catch)
clipped_accumulated_flow = grid.accumulation(flowdir)
fig, ax = plt.subplots()
im = ax.imshow(
np.where(clipped_catch, clipped_accumulated_flow, np.nan),
extent=grid.extent,
cmap="cubehelix",
norm=LogNorm(1, clipped_accumulated_flow.max()),
interpolation="bilinear",
)
plt.colorbar(
im,
ax=ax,
label="Upstream Cells",
)
plt.scatter([x_snap], [y_snap], c="r", s=50)
Where to go from here?#
Explore datasets of global hydro basins and what they are used for:
The HydroSHEDS database offers a suite of global digital data layers in support of hydro-ecological research and applications worldwide. Its various hydrographic data products include catchment boundaries, river networks, and lakes at multiple resolutions and scales. HydroSHEDS data are freely available in standard GIS formats and form the geospatial framework for a broad range of assessments including hydrological, environmental, conservation, socioeconomic, and human health applications.
https://www.hydrosheds.org/applications/ssp-hydropower
HydroSHEDS data have been used to assess configuration scenarios for future hydropower development at global, national (e.g., Zambia, Nepal) or basin-level (e.g., Marañón river) scales. This ‘system-scale planning’ (SSP) approach provides a large-scale perspective which considers the complex interactions and cumulative effects of different dam combinations rather than determining the impacts of each dam project individually. This type of holistic assessment takes the regional and downstream effects of the entire hydropower scenario into account and allows for strategic planning by comparing various national or international development options. It is particularly useful during the early phases of hydropower portfolio design and can facilitate the dialog between stakeholders and policymakers as a decision support tool.
Calculate water flow volume at pour point using meteorological data:
With the delineated catchment area from pysheds
, you can utilize surface runoff data, which represents the part of precipitation and snowmelt that flows over the land without infiltrating the soil or evaporating, to estimate total flow at the basin’s pour point. By integrating surface runoff data over the entire catchment area, typically given in volume per area per time (e.g. millimetres per day), and summing it up for all the cells within the catchment, you can approximate the total water volume contributing to the flow at the pour point. This integration accounts for spatial variations in runoff generation across the catchment, providing you with an aggregated flow estimate that can be used to assess the hydrological output of the catchment at the specific location where the water exits the basin, for instance, the amount of water available for electricity generation in a run-of-river plant.
Exercises#
Determine a few additional catchment areas by varying the pour point and the threshold for the accumulated flow.